This commit is contained in:
jokob-sk
2025-03-01 09:44:15 +11:00
4 changed files with 200 additions and 167 deletions

View File

@@ -65,7 +65,7 @@
- Settings: - Settings:
- ![settings_example](/front/plugins/omada_sdn_openapi_import/omada_sdn_openapi_import_settings.png) - ![settings_example](/front/plugins/omada_sdn_openapi/omada_sdn_openapi_settings.png)
### Other info ### Other info

View File

@@ -331,7 +331,7 @@
} }
] ]
}, },
"default_value": "python3 /app/front/plugins/omada_sdn_openapi_import/script.py", "default_value": "python3 /app/front/plugins/omada_sdn_openapi/script.py",
"options": [], "options": [],
"localized": ["name", "description"], "localized": ["name", "description"],
"name": [ "name": [

View File

Before

Width:  |  Height:  |  Size: 176 KiB

After

Width:  |  Height:  |  Size: 176 KiB

View File

@@ -9,7 +9,7 @@ However, I found that approach somewhat unstable, so I decided
to give it a shot and create a new plugin with the goal of providing to give it a shot and create a new plugin with the goal of providing
same, but more reliable results. same, but more reliable results.
Please note that this is my first plugin, and Im not a Python developer. Please note that this is my first plugin, and I'm not a Python developer.
Any comments, bug fixes, or contributions are greatly appreciated. Any comments, bug fixes, or contributions are greatly appreciated.
Author: https://github.com/xfilo Author: https://github.com/xfilo
@@ -17,6 +17,8 @@ Author: https://github.com/xfilo
__author__ = "xfilo" __author__ = "xfilo"
__version__ = 0.1 # Initial version __version__ = 0.1 # Initial version
__version__ = 0.2 # Rephrased error messages, improved logging and code logic
__version__ = 0.3 # Refactored data collection into a class, improved code clarity with comments
import os import os
import sys import sys
@@ -59,7 +61,7 @@ urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class OmadaHelper: class OmadaHelper:
@staticmethod @staticmethod
def log(message: str, level: Literal["minimal", "verbose", "debug", "trace"] = "minimal") -> None: def log(message: str, level: Literal["minimal", "verbose", "debug", "trace"] = "minimal") -> None:
mylog(level, [f"[{pluginName}] {message}"]) mylog(level, [f"[{pluginName}] [{level[:1].upper()}] {message}"])
@staticmethod @staticmethod
def debug(message: str) -> None: def debug(message: str) -> None:
@@ -75,8 +77,7 @@ class OmadaHelper:
@staticmethod @staticmethod
def response(response_type: str, response_message: str, response_result: Any = None) -> Dict[str, Any]: def response(response_type: str, response_message: str, response_result: Any = None) -> Dict[str, Any]:
return {"response_type": response_type, "response_message": response_message, return {"response_type": response_type, "response_message": response_message, "response_result": response_result}
"response_result": response_result}
@staticmethod @staticmethod
def timestamp_to_datetime(ms: int, timezone: str) -> Dict[str, Any]: def timestamp_to_datetime(ms: int, timezone: str) -> Dict[str, Any]:
@@ -85,6 +86,7 @@ class OmadaHelper:
if not ms or not isinstance(ms, (str, int)): if not ms or not isinstance(ms, (str, int)):
raise ValueError(f"Value '{ms}' is not a valid timestamp") raise ValueError(f"Value '{ms}' is not a valid timestamp")
# Convert UTC millisecond timestamp to datetime in NetAlertX's timezone
timestamp = ms / 1000 timestamp = ms / 1000
tz = pytz.timezone("UTC") tz = pytz.timezone("UTC")
utc_datetime = datetime.fromtimestamp(timestamp, tz=tz) utc_datetime = datetime.fromtimestamp(timestamp, tz=tz)
@@ -111,6 +113,7 @@ class OmadaHelper:
if not mac or not isinstance(mac, str) or mac is None: if not mac or not isinstance(mac, str) or mac is None:
raise Exception(f"Value '{mac}' is not a valid MAC address") raise Exception(f"Value '{mac}' is not a valid MAC address")
# Replace - with : in a MAC address and make it lowercase
result = mac.lower().replace("-", ":") result = mac.lower().replace("-", ":")
msg = f"Normalized MAC address from {mac} to {result}" msg = f"Normalized MAC address from {mac} to {result}"
OmadaHelper.debug(msg) OmadaHelper.debug(msg)
@@ -127,7 +130,9 @@ class OmadaHelper:
if not isinstance(input_data, list): if not isinstance(input_data, list):
raise Exception(f"Expected a list, but got '{type(input_data)}'.") raise Exception(f"Expected a list, but got '{type(input_data)}'.")
OmadaHelper.verbose(f"Starting normalization of {len(input_data)} {input_type}(s) from site {site_name}") OmadaHelper.verbose(f"Starting normalization of {len(input_data)} {input_type}(s) from site: {site_name}")
# The default return structure for one device/client
default_entry = { default_entry = {
"mac_address": "", "mac_address": "",
"ip_address": "", "ip_address": "",
@@ -141,8 +146,11 @@ class OmadaHelper:
} }
result = [] result = []
# Loop through each device/client
for data in input_data: for data in input_data:
# Normalize and verify MAC address
mac = OmadaHelper.normalize_mac(data.get("mac")) mac = OmadaHelper.normalize_mac(data.get("mac"))
if not isinstance(mac, dict) or mac.get("response_type") != "success": if not isinstance(mac, dict) or mac.get("response_type") != "success":
continue continue
@@ -152,58 +160,53 @@ class OmadaHelper:
OmadaHelper.debug(f"Skipping {input_type}, not a MAC address: {mac}") OmadaHelper.debug(f"Skipping {input_type}, not a MAC address: {mac}")
continue continue
# Assigning mandatory return values
entry = default_entry.copy() entry = default_entry.copy()
entry["mac_address"] = mac entry["mac_address"] = mac
entry["ip_address"] = data.get("ip", "") entry["ip_address"] = data.get("ip")
entry["name"] = data.get("name", "") entry["name"] = data.get("name")
# Assign the last datetime the device/client was seen on the network
last_seen = OmadaHelper.timestamp_to_datetime(data.get("lastSeen", 0), timezone) last_seen = OmadaHelper.timestamp_to_datetime(data.get("lastSeen", 0), timezone)
entry["last_seen"] = last_seen.get("response_result") if isinstance(last_seen, dict) and last_seen.get( entry["last_seen"] = last_seen.get("response_result") if isinstance(last_seen, dict) and last_seen.get("response_type") == "success" else ""
"response_type") == "success" else ""
# Applicable only for DEVICE
if input_type == "device": if input_type == "device":
entry["device_type"] = data.get("type") entry["device_type"] = data.get("type")
# If it's not a gateway try to assign parent node MAC
if data.get("type", "") != "gateway": if data.get("type", "") != "gateway":
parent_mac = OmadaHelper.normalize_mac(data.get("uplinkDeviceMac")) parent_mac = OmadaHelper.normalize_mac(data.get("uplinkDeviceMac"))
parent_mac = parent_mac.get("response_result") if isinstance(parent_mac, entry["parent_node_mac_address"] = parent_mac.get("response_result") if isinstance(parent_mac, dict) and parent_mac.get("response_type") == "success" else ""
dict) and parent_mac.get(
"response_type") == "success" else ""
entry["parent_node_mac_address"] = parent_mac
# Applicable only for CLIENT
if input_type == "client": if input_type == "client":
entry["vlan_id"] = data.get("vid") entry["vlan_id"] = data.get("vid")
entry["device_type"] = data.get("deviceType") entry["device_type"] = data.get("deviceType")
# Try to assign parent node MAC and PORT/SSID to the CLIENT
if data.get("connectDevType", "") == "gateway": if data.get("connectDevType", "") == "gateway":
parent_mac = OmadaHelper.normalize_mac(data.get("gatewayMac")) parent_mac = OmadaHelper.normalize_mac(data.get("gatewayMac"))
entry["parent_node_mac_address"] = parent_mac.get("response_result") if isinstance(parent_mac, entry["parent_node_mac_address"] = parent_mac.get("response_result") if isinstance(parent_mac, dict) and parent_mac.get("response_type") == "success" else ""
dict) and parent_mac.get(
"response_type") == "success" else ""
entry["parent_node_port"] = data.get("port", "") entry["parent_node_port"] = data.get("port", "")
elif data.get("connectDevType", "") == "switch": elif data.get("connectDevType", "") == "switch":
parent_mac = OmadaHelper.normalize_mac(data.get("switchMac")) parent_mac = OmadaHelper.normalize_mac(data.get("switchMac"))
entry["parent_node_mac_address"] = parent_mac.get("response_result") if isinstance(parent_mac, entry["parent_node_mac_address"] = parent_mac.get("response_result") if isinstance(parent_mac, dict) and parent_mac.get("response_type") == "success" else ""
dict) and parent_mac.get(
"response_type") == "success" else ""
entry["parent_node_port"] = data.get("port", "") entry["parent_node_port"] = data.get("port", "")
elif data.get("connectDevType", "") == "ap": elif data.get("connectDevType", "") == "ap":
parent_mac = OmadaHelper.normalize_mac(data.get("apMac")) parent_mac = OmadaHelper.normalize_mac(data.get("apMac"))
entry["parent_node_mac_address"] = parent_mac.get("response_result") if isinstance(parent_mac, entry["parent_node_mac_address"] = parent_mac.get("response_result") if isinstance(parent_mac, dict) and parent_mac.get("response_type") == "success" else ""
dict) and parent_mac.get(
"response_type") == "success" else ""
entry["parent_node_ssid"] = data.get("ssid", "") entry["parent_node_ssid"] = data.get("ssid", "")
# Add the entry to the result
result.append(entry) result.append(entry)
OmadaHelper.debug(f"Processed {input_type} entry: {entry}") OmadaHelper.debug(f"Processed {input_type} entry: {entry}")
msg = f"Successfully normalized {len(result)} {input_type}(s) from site: {site_name}" msg = f"Successfully normalized {len(result)} {input_type}(s) from site: {site_name}"
OmadaHelper.verbose(msg) OmadaHelper.minimal(msg)
final_result = OmadaHelper.response("success", msg, result) return OmadaHelper.response("success", msg, result)
except Exception as ex: except Exception as ex:
msg = f"Failed normalizing {input_type}(s) from site '{site_name}' - error: {str(ex)}" msg = f"Failed normalizing {input_type}(s) from site '{site_name}' - error: {str(ex)}"
OmadaHelper.verbose(msg) OmadaHelper.verbose(msg)
final_result = OmadaHelper.response("error", msg) return OmadaHelper.response("error", msg)
return final_result
class OmadaAPI: class OmadaAPI:
@@ -223,14 +226,18 @@ class OmadaAPI:
# Validate and set attributes # Validate and set attributes
for param_name, param_info in params.items(): for param_name, param_info in params.items():
# Get user parameter input, or default value if any
value = options.get(param_name, param_info.get("default")) value = options.get(param_name, param_info.get("default"))
# Check if a parameter is required and if it's value is non-empty
if param_info["required"] and (value is None or (param_info["type"] == str and not value)): if param_info["required"] and (value is None or (param_info["type"] == str and not value)):
raise ValueError(f"{param_name} is required and must be a non-empty {param_info['type'].__name__}") raise ValueError(f"{param_name} is required and must be a non-empty {param_info['type'].__name__}")
# Check if a parameter has a correct datatype
if not isinstance(value, param_info["type"]): if not isinstance(value, param_info["type"]):
raise TypeError(f"{param_name} must be of type {param_info['type'].__name__}") raise TypeError(f"{param_name} must be of type {param_info['type'].__name__}")
# Assign the parameter to the class
setattr(self, param_name, value) setattr(self, param_name, value)
OmadaHelper.debug(f"Initialized option '{param_name}' with value: {value}") OmadaHelper.debug(f"Initialized option '{param_name}' with value: {value}")
@@ -245,6 +252,7 @@ class OmadaAPI:
def _get_headers(self, include_auth: bool = True) -> dict: def _get_headers(self, include_auth: bool = True) -> dict:
"""Return request headers.""" """Return request headers."""
headers = {"Content-type": "application/json"} headers = {"Content-type": "application/json"}
# Add access token to header if requested and available
if include_auth == True: if include_auth == True:
if not self.access_token: if not self.access_token:
OmadaHelper.debug("No access token available for headers") OmadaHelper.debug("No access token available for headers")
@@ -254,47 +262,58 @@ class OmadaAPI:
return headers return headers
def _make_request(self, method: str, endpoint: str, **kwargs: Any) -> Dict[str, Any]: def _make_request(self, method: str, endpoint: str, **kwargs: Any) -> Dict[str, Any]:
"""Make a request to an endpoint."""
time.sleep(1) # Sleep before making any request so it does not rate-limited time.sleep(1) # Sleep before making any request so it does not rate-limited
OmadaHelper.verbose(f"{method} request to endpoint: {endpoint}") OmadaHelper.debug(f"{method} request to endpoint: {endpoint}")
url = f"{getattr(self, 'host')}{endpoint}" url = f"{getattr(self, 'host')}{endpoint}"
headers = self._get_headers(kwargs.pop('include_auth', True)) headers = self._get_headers(kwargs.pop('include_auth', True))
try: try:
# Make the request and get the response
response = requests.request(method, url, headers=headers, verify=getattr(self, 'verify_ssl'), **kwargs) response = requests.request(method, url, headers=headers, verify=getattr(self, 'verify_ssl'), **kwargs)
response.raise_for_status() response.raise_for_status()
data = response.json() data = response.json()
# Check if the response contains an error code and determine the function response type
response_type = "error" if data.get("errorCode", 0) != 0 else "success" response_type = "error" if data.get("errorCode", 0) != 0 else "success"
msg = f"{method} request completed: {endpoint}" msg = f"{method} request completed: {endpoint}"
OmadaHelper.minimal(msg) OmadaHelper.verbose(msg)
return OmadaHelper.response(response_type, msg, data) return OmadaHelper.response(response_type, msg, data)
except requests.exceptions.RequestException as ex: except requests.exceptions.RequestException as ex:
msg = f"{method} request failed: {str(ex)}" OmadaHelper.minimal(f"{method} request failed: {url}")
OmadaHelper.minimal(f"{method} request to {endpoint} failed") OmadaHelper.verbose(f"{method} request error: {str(ex)}")
OmadaHelper.verbose(msg) return OmadaHelper.response("error", f"{method} request failed to endpoint '{endpoint}' with error: {str(ex)}")
return OmadaHelper.response("error", msg)
def authenticate(self) -> Dict[str, any]: def authenticate(self) -> Dict[str, any]:
"""Make an endpoint request to get access token.""" """Make an endpoint request to get access token."""
OmadaHelper.verbose("Starting authentication process") OmadaHelper.verbose("Starting authentication process")
# Endpoint request
endpoint = "/openapi/authorize/token?grant_type=client_credentials" endpoint = "/openapi/authorize/token?grant_type=client_credentials"
payload = { payload = {
"omadacId": getattr(self, 'omada_id'), "omadacId": getattr(self, 'omada_id'),
"client_id": getattr(self, 'client_id'), "client_id": getattr(self, 'client_id'),
"client_secret": getattr(self, 'client_secret') "client_secret": getattr(self, 'client_secret')
} }
response = self._make_request("POST", endpoint, json=payload, include_auth=False) response = self._make_request("POST", endpoint, json=payload, include_auth=False)
if response["response_type"] == "success":
token_data = response["response_result"]
if token_data.get("errorCode") == 0:
self.access_token = token_data["result"]["accessToken"]
self.refresh_token = token_data["result"]["refreshToken"]
OmadaHelper.minimal("Authentication successful")
return OmadaHelper.response("success", "Authenticated successfully")
OmadaHelper.minimal("Authentication failed") # Successful endpoint response
if response.get("response_type") == "success":
response_result = response.get("response_result")
error_code = response_result.get("errorCode")
access_token = response_result.get("result").get("accessToken")
refresh_token = response_result.get("result").get("refreshToken")
# Authentication is successful if there isn't a response error, and access_token and refresh_token are set
if error_code == 0 and access_token and refresh_token:
self.access_token = access_token
self.refresh_token = refresh_token
msg = "Successfully authenticated"
OmadaHelper.minimal(msg)
return OmadaHelper.response("success", msg)
# Failed authentication
OmadaHelper.debug(f"Authentication response: {response}") OmadaHelper.debug(f"Authentication response: {response}")
return OmadaHelper.response("error", return OmadaHelper.response("error", f"Authentication failed - error: {response.get('response_message', 'Not provided')}")
f"Authentication failed - error: {response.get('response_result').get('msg')}")
def get_clients(self, site_id: str) -> Dict[str, Any]: def get_clients(self, site_id: str) -> Dict[str, Any]:
"""Make an endpoint request to get all online clients on a site.""" """Make an endpoint request to get all online clients on a site."""
@@ -308,52 +327,48 @@ class OmadaAPI:
endpoint = f"/openapi/v1/{getattr(self, 'omada_id')}/sites/{site_id}/devices?page=1&pageSize={getattr(self, 'page_size')}" endpoint = f"/openapi/v1/{getattr(self, 'omada_id')}/sites/{site_id}/devices?page=1&pageSize={getattr(self, 'page_size')}"
return self._make_request("GET", endpoint) return self._make_request("GET", endpoint)
def get_sites(self) -> Dict[str, Any]:
"""Make an endpoint request to populate all accesible sites."""
OmadaHelper.verbose("Retrieving all accessible sites")
endpoint = f"/openapi/v1/{getattr(self, 'omada_id')}/sites?page=1&pageSize={getattr(self, 'page_size')}"
return self._make_request("GET", endpoint)
def populate_sites(self) -> Dict[str, Any]: def populate_sites(self) -> Dict[str, Any]:
"""Make an endpoint request to populate sites.""" """Make an endpoint request to populate all accessible sites."""
try: OmadaHelper.verbose("Starting site population process")
OmadaHelper.verbose("Starting site population process")
# All allowed sites for credentials # Endpoint request
all_sites = self.get_sites()["response_result"].get("result").get("data", []) endpoint = f"/openapi/v1/{getattr(self, 'omada_id')}/sites?page=1&pageSize={getattr(self, 'page_size')}"
OmadaHelper.debug(f"Retrieved {len(all_sites)} sites in total") response = self._make_request("GET", endpoint)
# All available sites # Successful endpoint response
self.available_sites_dict = {site["siteId"]: site["name"] for site in all_sites} if response.get("response_type") == "success":
OmadaHelper.debug(f"Available sites: {self.available_sites_dict}") response_result = response.get("response_result")
if response_result.get("errorCode") == 0:
# All allowed sites for credentials
all_sites = response_result.get("result", "").get("data", [])
OmadaHelper.debug(f"Retrieved {len(all_sites)} sites in total")
# All valid sites from input # All available sites
active_sites_by_id = {site["siteId"]: site["name"] for site in all_sites if self.available_sites_dict = {site["siteId"]: site["name"] for site in all_sites}
site["siteId"] in self.requested_sites()} OmadaHelper.debug(f"Available sites: {self.available_sites_dict}")
active_sites_by_name = {site["siteId"]: site["name"] for site in all_sites if
site["name"] in self.requested_sites()}
self.active_sites_dict = active_sites_by_id | active_sites_by_name
OmadaHelper.debug(f"Active sites after filtering: {self.active_sites_dict}")
# If none of the input sites is valid/accessible, default to the first available site # All valid sites from input
if not self.active_sites_dict: active_sites_by_id = {site["siteId"]: site["name"] for site in all_sites if site["siteId"] in self.requested_sites()}
OmadaHelper.verbose( active_sites_by_name = {site["siteId"]: site["name"] for site in all_sites if site["name"] in self.requested_sites()}
"No valid site requested by configuration options, defaulting to first available site") self.active_sites_dict = active_sites_by_id | active_sites_by_name
first_available_site = next(iter(self.available_sites_dict.items()), (None, None)) OmadaHelper.debug(f"Active sites after filtering: {self.active_sites_dict}")
if first_available_site[0]: # Check if there's an available site
self.active_sites_dict = {first_available_site[0]: first_available_site[1]}
OmadaHelper.debug(f"Using first available site: {first_available_site}")
msg = f"Populated {len(self.active_sites_dict)} active sites" # If none of the input sites is valid/accessible, default to the first available site
OmadaHelper.verbose(msg) if not self.active_sites_dict:
result = OmadaHelper.response("success", msg) OmadaHelper.verbose("No valid site requested by configuration options, defaulting to first available site")
except Exception as ex: first_available_site = next(iter(self.available_sites_dict.items()), (None, None))
OmadaHelper.minimal("Failed to populate sites") if first_available_site[0]: # Check if there's an available site
msg = f"Site population error: {str(ex)}" self.active_sites_dict = {first_available_site[0]: first_available_site[1]}
OmadaHelper.verbose(msg) OmadaHelper.debug(f"Using first available site: {first_available_site}")
result = OmadaHelper.response("error", msg)
return result # Successful site population
msg = f"Successfully populated {len(self.active_sites_dict)} site(s)"
OmadaHelper.minimal(msg)
return OmadaHelper.response("success", msg)
# Failed site population
OmadaHelper.debug(f"Site population response: {response}")
return OmadaHelper.response("error", f"Site population failed - error: {response.get('response_message', 'Not provided')}")
def requested_sites(self) -> list: def requested_sites(self) -> list:
"""Returns sites requested by user.""" """Returns sites requested by user."""
@@ -368,98 +383,116 @@ class OmadaAPI:
return self.active_sites_dict return self.active_sites_dict
def make_entries(plugin_objects: Plugin_Objects, normalized_input_data: dict) -> None: class OmadaData:
if normalized_input_data.get("response_type", "error") != "success": @staticmethod
OmadaHelper.minimal( def create_data(plugin_objects: Plugin_Objects, normalized_input_data: dict) -> None:
f"Unable to make entries - error: {normalized_input_data.get('response_message', 'Not provided.')}") """Creates plugin object from normalized input data."""
return if normalized_input_data.get("response_type", "error") != "success":
OmadaHelper.minimal(f"Unable to make entries - error: {normalized_input_data.get('response_message', 'Not provided')}")
return
response_result = normalized_input_data.get("response_result", {}) # Loop through every device/client and make an plugin entry
for entry in response_result: response_result = normalized_input_data.get("response_result", {})
OmadaHelper.verbose(f"Making entry for: {entry['mac_address']}") for entry in response_result:
if len(entry) == 0:
OmadaHelper.minimal(f"Skipping entry, missing data.")
continue
parent_node = entry["parent_node_mac_address"] OmadaHelper.verbose(f"Making entry for: {entry['mac_address']}")
if len(parent_node) == 0 and entry["device_type"] == "gateway" and is_typical_router_ip(entry["ip_address"]):
parent_node = "Internet"
device_type = entry["device_type"].lower() # If the device_type is gateway, set the parent_node to Internet
if device_type == "iphone": device_type = entry["device_type"].lower()
device_type = "iPhone" parent_node = entry["parent_node_mac_address"]
elif device_type == "pc": if len(parent_node) == 0 and entry["device_type"] == "gateway" and is_typical_router_ip(entry["ip_address"]):
device_type = "PC" parent_node = "Internet"
else:
device_type = device_type.capitalize()
plugin_objects.add_object( # Some device type naming exceptions
primaryId=entry["mac_address"], if device_type == "iphone":
secondaryId=entry["ip_address"], device_type = "iPhone"
watched1=entry["name"], elif device_type == "pc":
watched2=parent_node, device_type = "PC"
watched3=entry["parent_node_port"], else:
watched4=entry["parent_node_ssid"], device_type = device_type.capitalize()
extra=device_type,
foreignKey=entry["mac_address"],
helpVal1=entry["last_seen"],
helpVal2=entry["site_name"],
helpVal3=entry["vlan_id"],
helpVal4="null"
)
# Add the plugin object
plugin_objects.add_object(
primaryId=entry["mac_address"],
secondaryId=entry["ip_address"],
watched1=entry["name"],
watched2=parent_node,
watched3=entry["parent_node_port"],
watched4=entry["parent_node_ssid"],
extra=device_type,
foreignKey=entry["mac_address"],
helpVal1=entry["last_seen"],
helpVal2=entry["site_name"],
helpVal3=entry["vlan_id"],
helpVal4="null"
)
def get_entries(plugin_objects: Plugin_Objects) -> Plugin_Objects: @staticmethod
OmadaHelper.minimal("Starting data collection process") def collect_data(plugin_objects: Plugin_Objects) -> Plugin_Objects:
omada_api = OmadaAPI(OPTIONS) """Collects device and client data from Omada Controller."""
omada_api = OmadaAPI(OPTIONS)
auth_result = omada_api.authenticate() # Authenticate
if auth_result["response_type"] == "error": auth_result = omada_api.authenticate()
OmadaHelper.minimal("Authentication failed, aborting data collection") if auth_result["response_type"] == "error":
OmadaHelper.debug(f"Authentication error - {auth_result['response_message']}") OmadaHelper.minimal("Authentication failed, aborting data collection")
OmadaHelper.debug(f"{auth_result['response_message']}")
return plugin_objects
# Populate sites
sites_result = omada_api.populate_sites()
if sites_result["response_type"] == "error":
OmadaHelper.minimal("Site population failed, aborting data collection")
OmadaHelper.debug(f"{sites_result['response_message']}")
return plugin_objects
requested_sites = omada_api.requested_sites()
available_sites = omada_api.available_sites()
active_sites = omada_api.active_sites()
OmadaHelper.verbose(f"Requested sites: {requested_sites}")
OmadaHelper.verbose(f"Available sites: {available_sites}")
OmadaHelper.verbose(f"Active sites: {active_sites}")
OmadaHelper.minimal("Starting data collection process")
# Loop through sites and collect data
for site_id, site_name in active_sites.items():
OmadaHelper.verbose(f"Processing site: {site_name} ({site_id})")
# Collect device data
devices_response = omada_api.get_devices(site_id)
if devices_response["response_type"] != "success":
OmadaHelper.minimal(f"Failed to retrieve devices for site: {site_name}")
else:
devices = devices_response["response_result"].get("result").get("data", [])
OmadaHelper.debug(f"Retrieved {len(devices)} device(s) from site: {site_name}")
devices = OmadaHelper.normalize_data(devices, "device", site_name, TIMEZONE)
OmadaData.create_data(plugin_objects, devices)
# Collect client data
clients_response = omada_api.get_clients(site_id)
if clients_response["response_type"] != "success":
OmadaHelper.minimal(f"Failed to retrieve clients for site {site_name}")
else:
clients = clients_response["response_result"].get("result").get("data", [])
OmadaHelper.debug(f"Retrieved {len(clients)} client(s) from site: {site_name}")
clients = OmadaHelper.normalize_data(clients, "client", site_name, TIMEZONE)
OmadaData.create_data(plugin_objects, clients)
OmadaHelper.verbose(f"Site complete: {site_name} ({site_id})")
# Complete collection and return plugin object
OmadaHelper.minimal("Completed data collection process")
return plugin_objects return plugin_objects
sites_result = omada_api.populate_sites()
if sites_result["response_type"] == "error":
OmadaHelper.minimal("Site population failed, aborting data collection")
OmadaHelper.debug(f"Site population error - {auth_result['response_message']}")
return plugin_objects
requested_sites = omada_api.requested_sites()
available_sites = omada_api.available_sites()
active_sites = omada_api.active_sites()
OmadaHelper.verbose(f"Requested sites: {requested_sites}")
OmadaHelper.verbose(f"Available sites: {available_sites}")
OmadaHelper.minimal(f"Active sites: {active_sites}")
for site_id, site_name in active_sites.items():
OmadaHelper.verbose(f"Processing site: {site_name} ({site_id})")
devices_response = omada_api.get_devices(site_id)
if devices_response["response_type"] == "error":
OmadaHelper.minimal(f"Failed to retrieve devices for site: {site_name}")
else:
devices = devices_response["response_result"].get("result").get("data", [])
OmadaHelper.debug(f"Retrieved {len(devices)} device(s) from site: {site_name}")
devices = OmadaHelper.normalize_data(devices, "device", site_name, TIMEZONE)
make_entries(plugin_objects, devices)
clients_response = omada_api.get_clients(site_id)
if clients_response["response_type"] == "error":
OmadaHelper.minimal(f"Failed to retrieve clients for site {site_name}")
else:
clients = clients_response["response_result"].get("result").get("data", [])
OmadaHelper.debug(f"Retrieved {len(clients)} client(s) from site: {site_name}")
clients = OmadaHelper.normalize_data(clients, "client", site_name, TIMEZONE)
make_entries(plugin_objects, clients)
OmadaHelper.verbose(f"Site complete: {site_name} ({site_id})")
OmadaHelper.minimal("Data collection process completed")
return plugin_objects
def main(): def main():
start_time = time.time() start_time = time.time()
OmadaHelper.minimal("Starting execution") OmadaHelper.minimal(f"Starting execution, version {__version__}")
# Initialize the Plugin object output file # Initialize the Plugin object output file
plugin_objects = Plugin_Objects(RESULT_FILE) plugin_objects = Plugin_Objects(RESULT_FILE)
@@ -477,8 +510,8 @@ def main():
} }
OmadaHelper.verbose("Configuration options loaded") OmadaHelper.verbose("Configuration options loaded")
# Retrieve entries # Retrieve entries and write result
plugin_objects = get_entries(plugin_objects) plugin_objects = OmadaData.collect_data(plugin_objects)
plugin_objects.write_result_file() plugin_objects.write_result_file()
# Finish # Finish