New plugin for Omada SDN import using OpenAPI

This commit is contained in:
xfilo
2025-02-24 00:51:44 +01:00
parent 500129c440
commit 4d77ff3ff1
4 changed files with 1233 additions and 0 deletions

View File

@@ -0,0 +1,75 @@
## 🔍 Overview
- This plugin imports online devices and clients from the Omada SDN (Omada Controller) through the provided OpenAPI.
### ✨ Features
1. Import online devices (gateways, switches, and access points) compatible with Omada SDN and send them to NetAlertX.
2. Import online clients (e.g., computers and smartphones) and send them to NetAlertX.
### 📌 Requirements
- Omada Controller with Open API support.
#### ✅ Officially supported controllers - [Source](https://community.tp-link.com/en/business/forum/topic/590430)
- All Omada Pro versions support Open API
- Omada Software/Hardware Controller support Open API since Controller v5.12
### ⚙️ Setup guide & settings
1. Login to your **Omada Controller**.
2. In the **Global Dashboard**, navigate to **Settings**, select **Platform Integration**, then click on **Open API**.
3. Create new credentials by clicking **Add New App**.
- The `App Name` can be anything.
- Set the `Mode` to `Client`.
- Set the `Role` to `Viewer` or `Administrator`.
- For `Site Privileges`, choose `All (Including all new-created sites)` or select specific site(s).
- Click `Apply` to create the application.
4. From the created application, you will need the following fields.
- `Omada ID` - visible by clicking the **eye** icon next to the **edit** and **delete** buttons.
- `Client ID`
- `Client Secret`
5. Open **NetAlertX's Settings**, head to **Omada SDN using OpenAPI** `(OMDSDNOPENAPI)` and configure the plugin.
- `OMDSDNOPENAPI_RUN` - When the scan should run, good option is `schedule`.
- `OMDSDNOPENAPI_host` - Specify the host URL of your **Omada Controller**, including the protocol, e.g., `https://example.com:1234`.
- `OMDSDNOPENAPI_omada_id` - Enter the **Omada ID** obtained in the previous step.
- `OMDSDNOPENAPI_client_id` - Enter the **Client ID** obtained in the previous step.
- `OMDSDNOPENAPI_client_secret` - Enter the **Client Secret** obtained in the previous step.
- `OMDSDNOPENAPI_sites` (optional) - You can enter either the **site name** or **site ID**. If an invalid value is provided or neither is specified, the plugin will default to the first accessible site using the supplied credentials.
- `OMDSDNOPENAPI_verify_ssl` - Check this option to enable SSL verification for requests to your Omada Controller's OpenAPI. If unchecked, SSL verification will be disabled.
### 📋 Data populated by the plugin
- This table outlines the data fields populated by the plugin, their conditions, descriptions, and where they are visible.
| 🔹 Field | 🔄 Population Condition | 📖 Description | 👀 Visibility |
|---|---|---|---|
| **MAC** | Always populated | The device's unique MAC address | Device details |
| **Last IP** | Always populated | The device's assigned IP address | Device details |
| **Name** | Always populated | The device name retrieved from Omada | Device details |
| **Parent Node** | Only if available | MAC address of the parent device (switch, AP, or gateway) | Device details |
| **Parent Node Port** | Only if available | The port number used to connect to the parent device | Device details |
| **SSID** | Only if available | The SSID through which the device is connected | Device details |
| **Device Type** | Only if available | Detected device type (e.g., iPhone, PC, Android) | Device details |
| **Last Seen** | Always populated | Last recorded time the device was active on the network | Plugin details |
| **Omada Site** | Always populated | Omada site to which the device is assigned | Device details |
| **VLAN ID** | Only if available | VLAN ID assigned to the device | Plugin details |
### ⚠️ Limitations and warnings
- The plugin can fetch up to 1000 devices and 1000 clients from the Omada Controller.
- Using non-Omada SDN compatible devices (e.g., switches, APs) may result in incomplete or inaccurate data.
### 🖼️ Examples
- Settings:
- ![settings_example](/front/plugins/omada_sdn_openapi_import/omada_sdn_openapi_import_settings.png)
### Other info
- Version: 1.0
- Author : [xfilo](https://github.com/xfilo)
- Release Date: 24-February-2025
- Omada Open API documentation: https://use1-omada-northbound.tplinkcloud.com/doc.html#/home (may take a moment to load)

View File

@@ -0,0 +1,669 @@
{
"code_name": "omada_sdn_openapi_import",
"unique_prefix": "OMDSDNOPENAPI",
"plugin_type": "device_scanner",
"execution_order" : "Layer_0",
"enabled": true,
"data_source": "script",
"mapped_to_table": "CurrentScan",
"data_filters": [
{
"compare_column": "Object_PrimaryID",
"compare_operator": "==",
"compare_field_id": "txtMacFilter",
"compare_js_template": "'{value}'.toString()",
"compare_use_quotes": true
}
],
"show_ui": true,
"localized": ["display_name", "description", "icon"],
"display_name": [
{
"language_code": "en_us",
"string": "Omada SDN using OpenAPI"
}
],
"description": [
{
"language_code": "en_us",
"string": "This plugin imports devices and clients from the Omada SDN (Omada Controller) through the provided OpenAPI."
}
],
"icon": [
{
"language_code": "en_us",
"string": "<i class=\"fa fa-search\"></i>"
}
],
"params": [],
"settings": [
{
"function": "RUN",
"events": ["run"],
"type": {
"dataType": "string",
"elements": [
{ "elementType": "select", "elementOptions": [], "transformers": [] }
]
},
"default_value": "disabled",
"options": ["disabled", "once", "schedule", "always_after_scan"],
"localized": ["name", "description"],
"name": [
{
"language_code": "en_us",
"string": "When to run"
}
],
"description": [
{
"language_code": "en_us",
"string": "When the scan should run, good option is <code>schedule</code>."
}
]
},
{
"function": "RUN_SCHD",
"type": {
"dataType": "string",
"elements": [
{
"elementType": "span",
"elementOptions": [
{
"cssClasses": "input-group-addon validityCheck"
},
{
"getStringKey": "Gen_ValidIcon"
}
],
"transformers": []
},
{
"elementType": "input",
"elementOptions": [
{
"onChange": "validateRegex(this)"
},
{
"base64Regex": "Xig/OlwqfCg/OlswLTldfFsxLTVdWzAtOV18WzAtOV0rLVswLTldK3xcKi9bMC05XSspKVxzKyg/OlwqfCg/OlswLTldfDFbMC05XXwyWzAtM118WzAtOV0rLVswLTldK3xcKi9bMC05XSspKVxzKyg/OlwqfCg/OlsxLTldfFsxMl1bMC05XXwzWzAxXXxbMC05XSstWzAtOV0rfFwqL1swLTldKykpXHMrKD86XCp8KD86WzEtOV18MVswLTJdfFswLTldKy1bMC05XSt8XCovWzAtOV0rKSlccysoPzpcKnwoPzpbMC02XXxbMC02XS1bMC02XXxcKi9bMC05XSspKSQ="
}
],
"transformers": []
}
]
},
"default_value": "*/5 * * * *",
"options": [],
"localized": ["name", "description"],
"name": [
{
"language_code": "en_us",
"string": "Schedule"
}
],
"description": [
{
"language_code": "en_us",
"string": "Only enabled if you select <code>schedule</code> in the <a href=\"#OMDSDNOPENAPI_RUN\"><code>OMDSDNOPENAPI_RUN</code> setting</a>. Make sure you enter the schedule in the correct cron-like format (e.g. validate at <a href=\"https://crontab.guru/\" target=\"_blank\">crontab.guru</a>). For example entering <code>0 4 * * *</code> will run the scan after 4 am in the selected <a onclick=\"toggleAllSettings()\" href=\"#TIMEZONE\"><code>TIMEZONE</code></a>. Will be run NEXT time the time passes."
}
]
},
{
"function": "host",
"type": {
"dataType": "string",
"elements": [
{ "elementType": "input", "elementOptions": [], "transformers": [] }
]
},
"maxLength": 100,
"default_value": "",
"options": [],
"localized": ["name", "description"],
"name": [
{
"language_code": "en_us",
"string": "Host URL"
}
],
"description": [
{
"language_code": "en_us",
"string": "Specify the host URL of your <code>Omada Controller</code>, including the protocol, eg. <code>https://example.com:1234</code>."
}
]
},
{
"function": "omada_id",
"type": {
"dataType": "string",
"elements": [
{ "elementType": "input", "elementOptions": [], "transformers": [] }
]
},
"maxLength": 100,
"default_value": "",
"options": [],
"localized": ["name", "description"],
"name": [
{
"language_code": "en_us",
"string": "Omada ID"
}
],
"description": [
{
"language_code": "en_us",
"string": "Provide your <code>Omada ID</code>, which can be found in the <code>OpenAPI</code> section of your <code>Omada Controller</code>."
}
]
},
{
"function": "client_id",
"type": {
"dataType": "string",
"elements": [
{ "elementType": "input", "elementOptions": [], "transformers": [] }
]
},
"maxLength": 100,
"default_value": "",
"options": [],
"localized": ["name", "description"],
"name": [
{
"language_code": "en_us",
"string": "Client ID"
}
],
"description": [
{
"language_code": "en_us",
"string": "Enter the <code>Client ID</code> generated by your <code>Omada Controller</code> in the <code>OpenAPI</code> section."
}
]
},
{
"function": "client_secret",
"type": {
"dataType": "string",
"elements": [
{
"elementType": "input",
"elementOptions": [{ "type": "password" }],
"transformers": []
}
]
},
"maxLength": 100,
"default_value": "",
"options": [],
"localized": ["name", "description"],
"name": [
{
"language_code": "en_us",
"string": "Client Secret"
}
],
"description": [
{
"language_code": "en_us",
"string": "Input the <code>Client Secret</code> obtained from the <code>OpenAPI</code> section of your <code>Omada Controller</code>."
}
]
},
{
"function": "sites",
"type": {
"dataType": "array",
"elements": [
{
"elementType": "input",
"elementOptions": [
{ "placeholder": "Enter value" },
{ "suffix": "_in" },
{ "cssClasses": "col-sm-10" },
{ "prefillValue": "null" }
],
"transformers": []
},
{
"elementType": "button",
"elementOptions": [
{ "sourceSuffixes": ["_in"] },
{ "separator": "" },
{ "cssClasses": "col-xs-12" },
{ "onClick": "addList(this, false)" },
{ "getStringKey": "Gen_Add" }
],
"transformers": []
},
{
"elementType": "select",
"elementHasInputValue": 1,
"elementOptions": [
{ "multiple": "true" },
{ "readonly": "true" },
{ "editable": "true" }
],
"transformers": []
},
{
"elementType": "button",
"elementOptions": [
{ "sourceSuffixes": [] },
{ "separator": "" },
{ "cssClasses": "col-xs-6" },
{ "onClick": "removeAllOptions(this)" },
{ "getStringKey": "Gen_Remove_All" }
],
"transformers": []
},
{
"elementType": "button",
"elementOptions": [
{ "sourceSuffixes": [] },
{ "separator": "" },
{ "cssClasses": "col-xs-6" },
{ "onClick": "removeFromList(this)" },
{ "getStringKey": "Gen_Remove_Last" }
],
"transformers": []
}
]
},
"default_value": [],
"options": [],
"localized": ["name", "description"],
"name": [
{
"language_code": "en_us",
"string": "Omada Sites"
}
],
"description": [
{
"language_code": "en_us",
"string": "You can enter either the <code>site name</code> or <code>site ID</code>. If an invalid value is provided or neither is specified, the plugin will default to the first accessible site using the supplied credentials."
}
]
},
{
"function": "verify_ssl",
"type": {
"dataType": "boolean",
"elements": [
{
"elementType": "input",
"elementOptions": [{ "type": "checkbox" }],
"transformers": []
}
]
},
"default_value": true,
"options": [],
"localized": ["name", "description"],
"name": [
{
"language_code": "en_us",
"string": "Verify SSL"
}
],
"description": [
{
"language_code": "en_us",
"string": "Check this option to enable SSL verification for requests to your Omada Controller's OpenAPI. If unchecked, SSL verification will be disabled."
}
]
},
{
"function": "CMD",
"type": {
"dataType": "string",
"elements": [
{
"elementType": "input",
"elementOptions": [{ "readonly": "true" }],
"transformers": []
}
]
},
"default_value": "python3 /app/front/plugins/omada_sdn_openapi_import/script.py",
"options": [],
"localized": ["name", "description"],
"name": [
{
"language_code": "en_us",
"string": "Command"
}
],
"description": [
{
"language_code": "en_us",
"string": "Command to run. This can not be changed."
}
]
},
{
"function": "RUN_TIMEOUT",
"type": {
"dataType": "integer",
"elements": [
{
"elementType": "input",
"elementOptions": [{ "type": "number" }],
"transformers": []
}
]
},
"default_value": 30,
"options": [],
"localized": ["name", "description"],
"name": [
{
"language_code": "en_us",
"string": "Run timeout"
}
],
"description": [
{
"language_code": "en_us",
"string": "Maximum time in seconds to wait for the script to finish. If this time is exceeded the script is aborted."
}
]
},
{
"default_value": [],
"description": [
{
"language_code": "en_us",
"string": "Send a notification if selected values change. Use <code>CTRL + Click</code> to select/deselect. <ul> <li><code>Watched_Value1</code> is Device Name </li><li><code>Watched_Value2</code> is Parent Node MAC</li><li><code>Watched_Value3</code> is Parent Node Port </li><li><code>Watched_Value4</code> is Parent Node SSID </li></ul>"
}
],
"function": "WATCH",
"localized": ["name", "description"],
"name": [
{
"language_code": "en_us",
"string": "Watched"
}
],
"options": [
"Watched_Value1",
"Watched_Value2",
"Watched_Value3",
"Watched_Value4"
],
"type": {
"dataType": "array",
"elements": [
{
"elementType": "select",
"elementOptions": [{ "multiple": "true" }],
"transformers": []
}
]
}
},
{
"default_value": ["new", "watched-changed"],
"description": [
{
"language_code": "en_us",
"string": "Send a notification only on these statuses. <code>new</code> means a new unique (unique combination of PrimaryId and SecondaryId) object was discovered. <code>watched-changed</code> means that selected <code>Watched_ValueN</code> columns changed."
}
],
"function": "REPORT_ON",
"localized": ["name", "description"],
"name": [
{
"language_code": "en_us",
"string": "Report on"
}
],
"options": [
"new",
"watched-changed",
"watched-not-changed",
"missing-in-last-scan"
],
"type": {
"dataType": "array",
"elements": [
{
"elementType": "select",
"elementOptions": [{ "multiple": "true" }],
"transformers": []
}
]
}
}
],
"database_column_definitions": [
{
"column": "Index",
"css_classes": "col-sm-2",
"show": true,
"type": "none",
"default_value": "",
"options": [],
"localized": ["name"],
"name": [
{
"language_code": "en_us",
"string": "Index"
}
]
},
{
"column": "Object_PrimaryID",
"mapped_to_column": "cur_MAC",
"css_classes": "col-sm-3",
"show": true,
"type": "device_name_mac",
"default_value": "",
"options": [],
"localized": ["name"],
"name": [
{
"language_code": "en_us",
"string": "MAC Address"
}
]
},
{
"column": "Object_SecondaryID",
"mapped_to_column": "cur_IP",
"css_classes": "col-sm-2",
"show": true,
"type": "device_ip",
"default_value": "",
"options": [],
"localized": ["name"],
"name": [
{
"language_code": "en_us",
"string": "IP Address"
}
]
},
{
"column": "Watched_Value1",
"mapped_to_column": "cur_Name",
"css_classes": "col-sm-2",
"show": true,
"type": "label",
"default_value": "",
"options": [],
"localized": ["name"],
"name": [
{
"language_code": "en_us",
"string": "Device Name"
}
]
},
{
"column": "Watched_Value2",
"mapped_to_column": "cur_NetworkNodeMAC",
"css_classes": "col-sm-2",
"show": true,
"type": "device_name_mac",
"default_value": "",
"options": [],
"localized": ["name"],
"name": [
{
"language_code": "en_us",
"string": "Parent Node MAC"
}
]
},
{
"column": "Watched_Value3",
"mapped_to_column": "cur_PORT",
"css_classes": "col-sm-2",
"show": true,
"type": "label",
"default_value": "",
"options": [],
"localized": ["name"],
"name": [
{
"language_code": "en_us",
"string": "Parent Node Port"
}
]
},
{
"column": "Watched_Value4",
"mapped_to_column": "cur_SSID",
"css_classes": "col-sm-2",
"show": true,
"type": "label",
"default_value": "",
"options": [],
"localized": ["name"],
"name": [
{
"language_code": "en_us",
"string": "Parent Node SSID"
}
]
},
{
"column": "Extra",
"mapped_to_column": "cur_Type",
"css_classes": "col-sm-2",
"show": false,
"type": "label",
"default_value": "",
"options": [],
"localized": ["name"],
"name": [
{
"language_code": "en_us",
"string": "Device Type"
}
]
},
{
"column": "Dummy",
"mapped_to_column": "cur_ScanMethod",
"mapped_to_column_data": {
"value": "OMDSDNOPENAPI"
},
"css_classes": "col-sm-2",
"show": false,
"type": "label",
"default_value": "",
"options": [],
"localized": ["name"],
"name": [
{
"language_code": "en_us",
"string": "Scan method"
}
]
},
{
"column": "DateTimeCreated",
"css_classes": "col-sm-2",
"show": true,
"type": "label",
"default_value": "",
"options": [],
"localized": ["name"],
"name": [
{
"language_code": "en_us",
"string": "Created"
}
]
},
{
"column": "DateTimeChanged",
"css_classes": "col-sm-2",
"show": false,
"type": "label",
"default_value": "",
"options": [],
"localized": ["name"],
"name": [
{
"language_code": "en_us",
"string": "Changed"
}
]
},
{
"column": "HelpVal1",
"css_classes": "col-sm-2",
"show": true,
"type": "label",
"default_value": "",
"options": [],
"localized": ["name"],
"name": [
{
"language_code": "en_us",
"string": "Last Seen"
}
]
},
{
"column": "HelpVal2",
"mapped_to_column": "cur_NetworkSite",
"css_classes": "col-sm-2",
"show": true,
"type": "label",
"default_value": "",
"options": [],
"localized": ["name"],
"name": [
{
"language_code": "en_us",
"string": "Omada Site"
}
]
},
{
"column": "HelpVal3",
"css_classes": "col-sm-2",
"show": true,
"type": "label",
"default_value": "",
"options": [],
"localized": ["name"],
"name": [
{
"language_code": "en_us",
"string": "VLAN ID"
}
]
}
]
}

Binary file not shown.

After

Width:  |  Height:  |  Size: 176 KiB

View File

@@ -0,0 +1,489 @@
#!/usr/bin/env python
"""
This plugin imports devices and clients from Omada Controller using their OpenAPI.
It was inspired by the 'omada_sdn_imp/omada_sdn.py' plugin,
which relied on the 'tplink_omada_client' library instead of OpenAPI.
However, I found that approach somewhat unstable, so I decided
to give it a shot and create a new plugin with the goal of providing
same, but more reliable results.
Please note that this is my first plugin, and Im not a Python developer.
Any comments, bug fixes, or contributions are greatly appreciated.
Author: https://github.com/xfilo
"""
__author__ = "xfilo"
__version__ = 0.1 # Initial version
import os
import sys
import urllib3
import requests
import time
import datetime
import pytz
from datetime import datetime
from typing import Literal, Any, Dict
# Define the installation path and extend the system path for plugin imports
INSTALL_PATH = "/app"
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
from plugin_helper import Plugin_Objects, is_typical_router_ip, is_mac
from logger import mylog, Logger
from const import logPath
from helper import get_setting_value
import conf
# Make sure the TIMEZONE for logging is correct
conf.tz = pytz.timezone(get_setting_value('TIMEZONE'))
# Make sure log level is initialized correctly
Logger(get_setting_value('LOG_LEVEL'))
pluginName = 'OMDSDNOPENAPI'
# Define the current path and log file paths
LOG_PATH = logPath + '/plugins'
LOG_FILE = os.path.join(LOG_PATH, f'script.{pluginName}.log')
RESULT_FILE = os.path.join(LOG_PATH, f'last_result.{pluginName}.log')
# Disable insecure request warning
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class OmadaHelper:
@staticmethod
def log(message: str, level: Literal["minimal", "verbose", "debug", "trace"] = "minimal") -> None:
mylog(level, [f"[{pluginName}] {message}"])
@staticmethod
def debug(message: str) -> None:
return OmadaHelper.log(message, "debug")
@staticmethod
def verbose(message: str) -> None:
return OmadaHelper.log(message, "verbose")
@staticmethod
def minimal(message: str) -> None:
return OmadaHelper.log(message, "minimal")
@staticmethod
def response(response_type: str, response_message: str, response_result: Any = None) -> Dict[str, Any]:
return {"response_type": response_type, "response_message": response_message,
"response_result": response_result}
@staticmethod
def timestamp_to_datetime(ms: int, timezone: str) -> Dict[str, Any]:
"""Returns datetime from millisecond timestamp with required timezone."""
try:
if not ms or not isinstance(ms, (str, int)):
raise ValueError(f"Value '{ms}' is not a valid timestamp")
timestamp = ms / 1000
tz = pytz.timezone("UTC")
utc_datetime = datetime.fromtimestamp(timestamp, tz=tz)
target_timezone = pytz.timezone(timezone)
local_datetime = utc_datetime.astimezone(target_timezone)
result = local_datetime.strftime("%Y-%m-%d %H:%M:%S")
msg = f"Converted timestamp {ms} to datetime {result} with timezone {timezone}"
OmadaHelper.debug(msg)
return OmadaHelper.response("success", msg, result)
except pytz.UnknownTimeZoneError:
msg = f"Failed to convert timestamp - unknown timezone: {timezone}"
OmadaHelper.verbose(msg)
return OmadaHelper.response("error", msg)
except Exception as ex:
msg = f"Failed to convert timestamp - error: {str(ex)}"
OmadaHelper.verbose(msg)
return OmadaHelper.response("error", msg)
@staticmethod
def normalize_mac(mac: str) -> Dict[str, Any]:
"""Returns a normalized version of MAC address."""
try:
if not mac or not isinstance(mac, str) or mac is None:
raise Exception(f"Value '{mac}' is not a valid MAC address")
result = mac.lower().replace("-", ":")
msg = f"Normalized MAC address from {mac} to {result}"
OmadaHelper.debug(msg)
return OmadaHelper.response("success", msg, result)
except Exception as ex:
msg = f"Failed to normalize MAC address '{mac}' - error: {str(ex)}"
OmadaHelper.verbose(msg)
return OmadaHelper.response("error", msg)
@staticmethod
def normalize_data(input_data: list, input_type: str, site_name: str, timezone: str) -> Dict[str, Any]:
"""Returns a normalized dictionary of input data (clients, devices)."""
try:
if not isinstance(input_data, list):
raise Exception(f"Expected a list, but got '{type(input_data)}'.")
OmadaHelper.verbose(f"Starting normalization of {len(input_data)} {input_type}(s) from site {site_name}")
default_entry = {
"mac_address": "",
"ip_address": "",
"name": "",
"last_seen": "",
"site_name": site_name,
"parent_node_mac_address": "",
"parent_node_port": "",
"parent_node_ssid": "",
"vlan_id": "",
}
result = []
for data in input_data:
mac = OmadaHelper.normalize_mac(data.get("mac"))
if not isinstance(mac, dict) or mac.get("response_type") != "success":
continue
mac = mac.get("response_result")
if not is_mac(mac):
OmadaHelper.debug(f"Skipping {input_type}, not a MAC address: {mac}")
continue
entry = default_entry.copy()
entry["mac_address"] = mac
entry["ip_address"] = data.get("ip", "")
entry["name"] = data.get("name", "")
last_seen = OmadaHelper.timestamp_to_datetime(data.get("lastSeen", 0), timezone)
entry["last_seen"] = last_seen.get("response_result") if isinstance(last_seen, dict) and last_seen.get(
"response_type") == "success" else ""
if input_type == "device":
entry["device_type"] = data.get("type")
if data.get("type", "") != "gateway":
parent_mac = OmadaHelper.normalize_mac(data.get("uplinkDeviceMac"))
parent_mac = parent_mac.get("response_result") if isinstance(parent_mac,
dict) and parent_mac.get(
"response_type") == "success" else ""
entry["parent_node_mac_address"] = parent_mac
if input_type == "client":
entry["vlan_id"] = data.get("vid")
entry["device_type"] = data.get("deviceType")
if data.get("connectDevType", "") == "gateway":
parent_mac = OmadaHelper.normalize_mac(data.get("gatewayMac"))
entry["parent_node_mac_address"] = parent_mac.get("response_result") if isinstance(parent_mac,
dict) and parent_mac.get(
"response_type") == "success" else ""
entry["parent_node_port"] = data.get("port", "")
elif data.get("connectDevType", "") == "switch":
parent_mac = OmadaHelper.normalize_mac(data.get("switchMac"))
entry["parent_node_mac_address"] = parent_mac.get("response_result") if isinstance(parent_mac,
dict) and parent_mac.get(
"response_type") == "success" else ""
entry["parent_node_port"] = data.get("port", "")
elif data.get("connectDevType", "") == "ap":
parent_mac = OmadaHelper.normalize_mac(data.get("apMac"))
entry["parent_node_mac_address"] = parent_mac.get("response_result") if isinstance(parent_mac,
dict) and parent_mac.get(
"response_type") == "success" else ""
entry["parent_node_ssid"] = data.get("ssid", "")
result.append(entry)
OmadaHelper.debug(f"Processed {input_type} entry: {entry}")
msg = f"Successfully normalized {len(result)} {input_type}(s) from site: {site_name}"
OmadaHelper.verbose(msg)
final_result = OmadaHelper.response("success", msg, result)
except Exception as ex:
msg = f"Failed normalizing {input_type}(s) from site '{site_name}' - error: {str(ex)}"
OmadaHelper.verbose(msg)
final_result = OmadaHelper.response("error", msg)
return final_result
class OmadaAPI:
def __init__(self, options: dict):
OmadaHelper.debug("Initializing OmadaAPI with provided options")
# Define parameters: required, optional, and default values
params = {
"host": {"type": str, "required": True},
"omada_id": {"type": str, "required": True},
"client_id": {"type": str, "required": True},
"client_secret": {"type": str, "required": True},
"verify_ssl": {"type": bool, "required": False, "default": True},
"page_size": {"type": int, "required": False, "default": 1000},
"sites": {"type": list, "required": False, "default": []}
}
# Validate and set attributes
for param_name, param_info in params.items():
value = options.get(param_name, param_info.get("default"))
if param_info["required"] and (value is None or (param_info["type"] == str and not value)):
raise ValueError(f"{param_name} is required and must be a non-empty {param_info['type'].__name__}")
if not isinstance(value, param_info["type"]):
raise TypeError(f"{param_name} must be of type {param_info['type'].__name__}")
setattr(self, param_name, value)
OmadaHelper.debug(f"Initialized option '{param_name}' with value: {value}")
# Other parameters
self.available_sites_dict = {}
self.active_sites_dict = {}
self.access_token = None
self.refresh_token = None
OmadaHelper.verbose("OmadaAPI initialized")
def _get_headers(self, include_auth: bool = True) -> dict:
"""Return request headers."""
headers = {"Content-type": "application/json"}
if include_auth == True:
if not self.access_token:
OmadaHelper.debug("No access token available for headers")
else:
headers["Authorization"] = f"AccessToken={self.access_token}"
OmadaHelper.debug(f"Generated headers: {headers}")
return headers
def _make_request(self, method: str, endpoint: str, **kwargs: Any) -> Dict[str, Any]:
time.sleep(1) # Sleep before making any request so it does not rate-limited
OmadaHelper.verbose(f"{method} request to endpoint: {endpoint}")
url = f"{getattr(self, 'host')}{endpoint}"
headers = self._get_headers(kwargs.pop('include_auth', True))
try:
response = requests.request(method, url, headers=headers, verify=getattr(self, 'verify_ssl'), **kwargs)
response.raise_for_status()
data = response.json()
response_type = "error" if data.get("errorCode", 0) != 0 else "success"
msg = f"{method} request completed: {endpoint}"
OmadaHelper.minimal(msg)
return OmadaHelper.response(response_type, msg, data)
except requests.exceptions.RequestException as ex:
msg = f"{method} request failed: {str(ex)}"
OmadaHelper.minimal(f"{method} request to {endpoint} failed")
OmadaHelper.verbose(msg)
return OmadaHelper.response("error", msg)
def authenticate(self) -> Dict[str, any]:
"""Make an endpoint request to get access token."""
OmadaHelper.verbose("Starting authentication process")
endpoint = "/openapi/authorize/token?grant_type=client_credentials"
payload = {
"omadacId": getattr(self, 'omada_id'),
"client_id": getattr(self, 'client_id'),
"client_secret": getattr(self, 'client_secret')
}
response = self._make_request("POST", endpoint, json=payload, include_auth=False)
if response["response_type"] == "success":
token_data = response["response_result"]
if token_data.get("errorCode") == 0:
self.access_token = token_data["result"]["accessToken"]
self.refresh_token = token_data["result"]["refreshToken"]
OmadaHelper.minimal("Authentication successful")
return OmadaHelper.response("success", "Authenticated successfully")
OmadaHelper.minimal("Authentication failed")
OmadaHelper.debug(f"Authentication response: {response}")
return OmadaHelper.response("error",
f"Authentication failed - error: {response.get('response_result').get('msg')}")
def get_clients(self, site_id: str) -> Dict[str, Any]:
"""Make an endpoint request to get all online clients on a site."""
OmadaHelper.verbose(f"Retrieving clients for site: {site_id}")
endpoint = f"/openapi/v1/{getattr(self, 'omada_id')}/sites/{site_id}/clients?page=1&pageSize={getattr(self, 'page_size')}"
return self._make_request("GET", endpoint)
def get_devices(self, site_id: str) -> Dict[str, Any]:
"""Make an endpoint request to get all online devices on a site."""
OmadaHelper.verbose(f"Retrieving devices for site: {site_id}")
endpoint = f"/openapi/v1/{getattr(self, 'omada_id')}/sites/{site_id}/devices?page=1&pageSize={getattr(self, 'page_size')}"
return self._make_request("GET", endpoint)
def get_sites(self) -> Dict[str, Any]:
"""Make an endpoint request to populate all accesible sites."""
OmadaHelper.verbose("Retrieving all accessible sites")
endpoint = f"/openapi/v1/{getattr(self, 'omada_id')}/sites?page=1&pageSize={getattr(self, 'page_size')}"
return self._make_request("GET", endpoint)
def populate_sites(self) -> Dict[str, Any]:
"""Make an endpoint request to populate sites."""
try:
OmadaHelper.verbose("Starting site population process")
# All allowed sites for credentials
all_sites = self.get_sites()["response_result"].get("result").get("data", [])
OmadaHelper.debug(f"Retrieved {len(all_sites)} sites in total")
# All available sites
self.available_sites_dict = {site["siteId"]: site["name"] for site in all_sites}
OmadaHelper.debug(f"Available sites: {self.available_sites_dict}")
# All valid sites from input
active_sites_by_id = {site["siteId"]: site["name"] for site in all_sites if
site["siteId"] in self.requested_sites()}
active_sites_by_name = {site["siteId"]: site["name"] for site in all_sites if
site["name"] in self.requested_sites()}
self.active_sites_dict = active_sites_by_id | active_sites_by_name
OmadaHelper.debug(f"Active sites after filtering: {self.active_sites_dict}")
# If none of the input sites is valid/accessible, default to the first available site
if not self.active_sites_dict:
OmadaHelper.verbose(
"No valid site requested by configuration options, defaulting to first available site")
first_available_site = next(iter(self.available_sites_dict.items()), (None, None))
if first_available_site[0]: # Check if there's an available site
self.active_sites_dict = {first_available_site[0]: first_available_site[1]}
OmadaHelper.debug(f"Using first available site: {first_available_site}")
msg = f"Populated {len(self.active_sites_dict)} active sites"
OmadaHelper.verbose(msg)
result = OmadaHelper.response("success", msg)
except Exception as ex:
OmadaHelper.minimal("Failed to populate sites")
msg = f"Site population error: {str(ex)}"
OmadaHelper.verbose(msg)
result = OmadaHelper.response("error", msg)
return result
def requested_sites(self) -> list:
"""Returns sites requested by user."""
return getattr(self, 'sites')
def available_sites(self) -> dict:
"""Returns all available sites."""
return self.available_sites_dict
def active_sites(self) -> dict:
"""Returns the sites the code will use."""
return self.active_sites_dict
def make_entries(plugin_objects: Plugin_Objects, normalized_input_data: dict) -> None:
if normalized_input_data.get("response_type", "error") != "success":
OmadaHelper.minimal(
f"Unable to make entries - error: {normalized_input_data.get('response_message', 'Not provided.')}")
return
response_result = normalized_input_data.get("response_result", {})
for entry in response_result:
OmadaHelper.verbose(f"Making entry for: {entry['mac_address']}")
parent_node = entry["parent_node_mac_address"]
if len(parent_node) == 0 and entry["device_type"] == "gateway" and is_typical_router_ip(entry["ip_address"]):
parent_node = "Internet"
device_type = entry["device_type"].lower()
if device_type == "iphone":
device_type = "iPhone"
elif device_type == "pc":
device_type = "PC"
else:
device_type = device_type.capitalize()
plugin_objects.add_object(
primaryId=entry["mac_address"],
secondaryId=entry["ip_address"],
watched1=entry["name"],
watched2=parent_node,
watched3=entry["parent_node_port"],
watched4=entry["parent_node_ssid"],
extra=device_type,
foreignKey=entry["mac_address"],
helpVal1=entry["last_seen"],
helpVal2=entry["site_name"],
helpVal3=entry["vlan_id"],
helpVal4="null"
)
def get_entries(plugin_objects: Plugin_Objects) -> Plugin_Objects:
OmadaHelper.minimal("Starting data collection process")
omada_api = OmadaAPI(OPTIONS)
auth_result = omada_api.authenticate()
if auth_result["response_type"] == "error":
OmadaHelper.minimal("Authentication failed, aborting data collection")
OmadaHelper.debug(f"Authentication error - {auth_result['response_message']}")
return plugin_objects
sites_result = omada_api.populate_sites()
if sites_result["response_type"] == "error":
OmadaHelper.minimal("Site population failed, aborting data collection")
OmadaHelper.debug(f"Site population error - {auth_result['response_message']}")
return plugin_objects
requested_sites = omada_api.requested_sites()
available_sites = omada_api.available_sites()
active_sites = omada_api.active_sites()
OmadaHelper.verbose(f"Requested sites: {requested_sites}")
OmadaHelper.verbose(f"Available sites: {available_sites}")
OmadaHelper.minimal(f"Active sites: {active_sites}")
for site_id, site_name in active_sites.items():
OmadaHelper.verbose(f"Processing site: {site_name} ({site_id})")
devices_response = omada_api.get_devices(site_id)
if devices_response["response_type"] == "error":
OmadaHelper.minimal(f"Failed to retrieve devices for site: {site_name}")
else:
devices = devices_response["response_result"].get("result").get("data", [])
OmadaHelper.debug(f"Retrieved {len(devices)} device(s) from site: {site_name}")
devices = OmadaHelper.normalize_data(devices, "device", site_name, TIMEZONE)
make_entries(plugin_objects, devices)
clients_response = omada_api.get_clients(site_id)
if clients_response["response_type"] == "error":
OmadaHelper.minimal(f"Failed to retrieve clients for site {site_name}")
else:
clients = clients_response["response_result"].get("result").get("data", [])
OmadaHelper.debug(f"Retrieved {len(clients)} client(s) from site: {site_name}")
clients = OmadaHelper.normalize_data(clients, "client", site_name, TIMEZONE)
make_entries(plugin_objects, clients)
OmadaHelper.verbose(f"Site complete: {site_name} ({site_id})")
OmadaHelper.minimal("Data collection process completed")
return plugin_objects
def main():
start_time = time.time()
OmadaHelper.minimal("Starting execution")
# Initialize the Plugin object output file
plugin_objects = Plugin_Objects(RESULT_FILE)
# Retrieve options
global OPTIONS, TIMEZONE
TIMEZONE = get_setting_value("TIMEZONE")
OPTIONS = {
"host": get_setting_value(f"{pluginName}_host").strip(),
"client_id": get_setting_value(f"{pluginName}_client_id").strip(),
"client_secret": get_setting_value(f"{pluginName}_client_secret").strip(),
"omada_id": get_setting_value(f"{pluginName}_omada_id").strip(),
"sites": get_setting_value(f"{pluginName}_sites"),
"verify_ssl": get_setting_value(f"{pluginName}_verify_ssl")
}
OmadaHelper.verbose("Configuration options loaded")
# Retrieve entries
plugin_objects = get_entries(plugin_objects)
plugin_objects.write_result_file()
# Finish
OmadaHelper.minimal(f"Execution completed in {time.time() - start_time:.2f}s, found {len(plugin_objects)} devices and clients")
if __name__ == '__main__':
main()