mirror of
https://github.com/jokob-sk/NetAlertX.git
synced 2025-12-07 09:36:05 -08:00
UNIFIAPI v0.4
This commit is contained in:
@@ -4,6 +4,7 @@ from pytz import timezone, all_timezones, UnknownTimeZoneError
|
|||||||
import sys
|
import sys
|
||||||
import re
|
import re
|
||||||
import base64
|
import base64
|
||||||
|
import json
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
|
||||||
INSTALL_PATH = "/app"
|
INSTALL_PATH = "/app"
|
||||||
@@ -116,6 +117,51 @@ def decodeBase64(inputParamBase64):
|
|||||||
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
# -------------------------------------------------------------------
|
||||||
|
def decode_settings_base64(encoded_str, convert_types=True):
|
||||||
|
"""
|
||||||
|
Decodes a base64-encoded JSON list of settings into a dict.
|
||||||
|
|
||||||
|
Each setting entry format:
|
||||||
|
[group, key, type, value]
|
||||||
|
|
||||||
|
Example:
|
||||||
|
[
|
||||||
|
["group", "name", "string", "Home - local"],
|
||||||
|
["group", "base_url", "string", "https://..."],
|
||||||
|
["group", "api_version", "integer", "2"],
|
||||||
|
["group", "verify_ssl", "boolean", "False"]
|
||||||
|
]
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
{
|
||||||
|
"name": "Home - local",
|
||||||
|
"base_url": "https://...",
|
||||||
|
"api_version": 2,
|
||||||
|
"verify_ssl": False
|
||||||
|
}
|
||||||
|
"""
|
||||||
|
decoded_json = base64.b64decode(encoded_str).decode("utf-8")
|
||||||
|
settings_list = json.loads(decoded_json)
|
||||||
|
|
||||||
|
settings_dict = {}
|
||||||
|
for _, key, _type, value in settings_list:
|
||||||
|
if convert_types:
|
||||||
|
_type_lower = _type.lower()
|
||||||
|
if _type_lower == "boolean":
|
||||||
|
settings_dict[key] = value.lower() == "true"
|
||||||
|
elif _type_lower == "integer":
|
||||||
|
settings_dict[key] = int(value)
|
||||||
|
elif _type_lower == "float":
|
||||||
|
settings_dict[key] = float(value)
|
||||||
|
else:
|
||||||
|
settings_dict[key] = value
|
||||||
|
else:
|
||||||
|
settings_dict[key] = value
|
||||||
|
|
||||||
|
return settings_dict
|
||||||
|
|
||||||
|
|
||||||
# -------------------------------------------------------------------
|
# -------------------------------------------------------------------
|
||||||
def normalize_mac(mac):
|
def normalize_mac(mac):
|
||||||
# Split the MAC address by colon (:) or hyphen (-) and convert each part to uppercase
|
# Split the MAC address by colon (:) or hyphen (-) and convert each part to uppercase
|
||||||
|
|||||||
@@ -573,24 +573,6 @@
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
"column": "Watched_Value2",
|
"column": "Watched_Value2",
|
||||||
"mapped_to_column": "cur_Vendor",
|
|
||||||
"css_classes": "col-sm-2",
|
|
||||||
"show": true,
|
|
||||||
"type": "label",
|
|
||||||
"default_value": "",
|
|
||||||
"options": [],
|
|
||||||
"localized": [
|
|
||||||
"name"
|
|
||||||
],
|
|
||||||
"name": [
|
|
||||||
{
|
|
||||||
"language_code": "en_us",
|
|
||||||
"string": "Vendor"
|
|
||||||
}
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"column": "Watched_Value3",
|
|
||||||
"mapped_to_column": "cur_Type",
|
"mapped_to_column": "cur_Type",
|
||||||
"css_classes": "col-sm-2",
|
"css_classes": "col-sm-2",
|
||||||
"show": true,
|
"show": true,
|
||||||
@@ -608,9 +590,9 @@
|
|||||||
]
|
]
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"column": "Watched_Value4",
|
"column": "Watched_Value3",
|
||||||
"css_classes": "col-sm-2",
|
"css_classes": "col-sm-2",
|
||||||
"show": false,
|
"show": true,
|
||||||
"type": "label",
|
"type": "label",
|
||||||
"default_value": "",
|
"default_value": "",
|
||||||
"options": [],
|
"options": [],
|
||||||
@@ -620,7 +602,25 @@
|
|||||||
"name": [
|
"name": [
|
||||||
{
|
{
|
||||||
"language_code": "en_us",
|
"language_code": "en_us",
|
||||||
"string": "N/A"
|
"string": "Connected"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"column": "Watched_Value4",
|
||||||
|
"mapped_to_column": "cur_NetworkNodeMAC",
|
||||||
|
"css_classes": "col-sm-2",
|
||||||
|
"show": true,
|
||||||
|
"type": "device_mac",
|
||||||
|
"default_value": "",
|
||||||
|
"options": [],
|
||||||
|
"localized": [
|
||||||
|
"name"
|
||||||
|
],
|
||||||
|
"name": [
|
||||||
|
{
|
||||||
|
"language_code": "en_us",
|
||||||
|
"string": "Parent"
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
@@ -628,7 +628,7 @@
|
|||||||
"column": "Dummy",
|
"column": "Dummy",
|
||||||
"mapped_to_column": "cur_ScanMethod",
|
"mapped_to_column": "cur_ScanMethod",
|
||||||
"mapped_to_column_data": {
|
"mapped_to_column_data": {
|
||||||
"value": "Example Plugin"
|
"value": "UNIFIAPI"
|
||||||
},
|
},
|
||||||
"css_classes": "col-sm-2",
|
"css_classes": "col-sm-2",
|
||||||
"show": false,
|
"show": false,
|
||||||
|
|||||||
@@ -6,12 +6,13 @@ import sys
|
|||||||
import json
|
import json
|
||||||
import sqlite3
|
import sqlite3
|
||||||
from pytz import timezone
|
from pytz import timezone
|
||||||
|
from unifi_sm_api.api import SiteManagerAPI
|
||||||
|
|
||||||
# Define the installation path and extend the system path for plugin imports
|
# Define the installation path and extend the system path for plugin imports
|
||||||
INSTALL_PATH = "/app"
|
INSTALL_PATH = "/app"
|
||||||
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
|
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
|
||||||
|
|
||||||
from plugin_helper import Plugin_Object, Plugin_Objects, decodeBase64
|
from plugin_helper import Plugin_Object, Plugin_Objects, decodeBase64, decode_settings_base64
|
||||||
from plugin_utils import get_plugins_configs
|
from plugin_utils import get_plugins_configs
|
||||||
from logger import mylog, Logger
|
from logger import mylog, Logger
|
||||||
from const import pluginsPath, fullDbPath, logPath
|
from const import pluginsPath, fullDbPath, logPath
|
||||||
@@ -26,7 +27,7 @@ conf.tz = timezone(get_setting_value('TIMEZONE'))
|
|||||||
# Make sure log level is initialized correctly
|
# Make sure log level is initialized correctly
|
||||||
Logger(get_setting_value('LOG_LEVEL'))
|
Logger(get_setting_value('LOG_LEVEL'))
|
||||||
|
|
||||||
pluginName = '<unique_prefix>'
|
pluginName = 'UNIFIAPI'
|
||||||
|
|
||||||
# Define the current path and log file paths
|
# Define the current path and log file paths
|
||||||
LOG_PATH = logPath + '/plugins'
|
LOG_PATH = logPath + '/plugins'
|
||||||
@@ -42,83 +43,142 @@ def main():
|
|||||||
mylog('verbose', [f'[{pluginName}] In script'])
|
mylog('verbose', [f'[{pluginName}] In script'])
|
||||||
|
|
||||||
# Retrieve configuration settings
|
# Retrieve configuration settings
|
||||||
some_setting = get_setting_value('SYNC_plugins')
|
unifi_sites_configs = get_setting_value('UNIFIAPI_sites')
|
||||||
|
|
||||||
mylog('verbose', [f'[{pluginName}] some_setting value {some_setting}'])
|
mylog('verbose', [f'[{pluginName}] number of unifi_sites_configs: {len(unifi_sites_configs)}'])
|
||||||
|
|
||||||
# retrieve data
|
|
||||||
device_data = get_device_data(some_setting)
|
for site_config in unifi_sites_configs:
|
||||||
|
|
||||||
# Process the data into native application tables
|
siteDict = decode_settings_base64(site_config)
|
||||||
if len(device_data) > 0:
|
|
||||||
|
|
||||||
# insert devices into the lats_result.log
|
mylog('verbose', [f'[{pluginName}] siteDict: {json.dumps(siteDict)}'])
|
||||||
# make sure the below mapping is mapped in config.json, for example:
|
mylog('none', [f'[{pluginName}] Connecting to: {siteDict["name"]}'])
|
||||||
#"database_column_definitions": [
|
|
||||||
# {
|
|
||||||
# "column": "Object_PrimaryID", <--------- the value I save into primaryId
|
|
||||||
# "mapped_to_column": "cur_MAC", <--------- gets inserted into the CurrentScan DB table column cur_MAC
|
|
||||||
#
|
|
||||||
for device in device_data:
|
|
||||||
plugin_objects.add_object(
|
|
||||||
primaryId = device['mac_address'],
|
|
||||||
secondaryId = device['ip_address'],
|
|
||||||
watched1 = device['hostname'],
|
|
||||||
watched2 = device['vendor'],
|
|
||||||
watched3 = device['device_type'],
|
|
||||||
watched4 = device['last_seen'],
|
|
||||||
extra = '',
|
|
||||||
foreignKey = device['mac_address']
|
|
||||||
# helpVal1 = "Something1", # Optional Helper values to be passed for mapping into the app
|
|
||||||
# helpVal2 = "Something1", # If you need to use even only 1, add the remaining ones too
|
|
||||||
# helpVal3 = "Something1", # and set them to 'null'. Check the the docs for details:
|
|
||||||
# helpVal4 = "Something1", # https://github.com/jokob-sk/NetAlertX/blob/main/docs/PLUGINS_DEV.md
|
|
||||||
)
|
|
||||||
|
|
||||||
mylog('verbose', [f'[{pluginName}] New entries: "{len(device_data)}"'])
|
api = SiteManagerAPI(
|
||||||
|
api_key=siteDict["api_key"],
|
||||||
|
version=siteDict["api_version"],
|
||||||
|
base_url=siteDict["base_url"],
|
||||||
|
verify_ssl=siteDict["verify_ssl"]
|
||||||
|
)
|
||||||
|
|
||||||
# log result
|
sites_resp = api.get_sites()
|
||||||
plugin_objects.write_result_file()
|
sites = sites_resp.get("data", [])
|
||||||
|
|
||||||
|
for site in sites:
|
||||||
|
|
||||||
|
# retrieve data
|
||||||
|
device_data = get_device_data(site, api)
|
||||||
|
|
||||||
|
# Process the data into native application tables
|
||||||
|
if len(device_data) > 0:
|
||||||
|
|
||||||
|
# insert devices into the lats_result.log
|
||||||
|
for device in device_data:
|
||||||
|
plugin_objects.add_object(
|
||||||
|
primaryId = device['dev_mac'], # mac
|
||||||
|
secondaryId = device['dev_ip'], # IP
|
||||||
|
watched1 = device['dev_name'], # name
|
||||||
|
watched2 = device['dev_type'], # device_type (AP/Switch etc)
|
||||||
|
watched3 = device['dev_connected'], # connectedAt or empty
|
||||||
|
watched4 = device['dev_parent_mac'],# parent_mac or "Internet"
|
||||||
|
extra = '',
|
||||||
|
foreignKey = device['dev_mac']
|
||||||
|
)
|
||||||
|
|
||||||
|
mylog('verbose', [f'[{pluginName}] New entries: "{len(device_data)}"'])
|
||||||
|
|
||||||
|
# log result
|
||||||
|
plugin_objects.write_result_file()
|
||||||
|
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
# retrieve data
|
# retrieve data
|
||||||
def get_device_data(some_setting):
|
def get_device_data(site, api):
|
||||||
|
|
||||||
device_data = []
|
device_data = []
|
||||||
|
|
||||||
# do some processing, call exteranl APIs, and return a device_data list
|
mylog('verbose', [f'[{pluginName}] Site: {site} '])
|
||||||
# ...
|
site_id = site["id"]
|
||||||
#
|
site_name = site.get("name", "Unnamed Site")
|
||||||
# Sample data for testing purposes, you can adjust the processing in main() as needed
|
|
||||||
# ... before adding it to the plugin_objects.add_object(...)
|
mylog('verbose', [f'[{pluginName}] Site: {site_name} ({site_id})'])
|
||||||
device_data = [
|
|
||||||
{
|
# --- Devices ---
|
||||||
'device_id': 'device1',
|
unifi_devices_resp = api.get_unifi_devices(site_id)
|
||||||
'mac_address': '00:11:22:33:44:55',
|
unifi_devices = unifi_devices_resp.get("data", [])
|
||||||
'ip_address': '192.168.1.2',
|
mylog('verbose', [f'[{pluginName}] Site: {site_name} unifi devices: {json.dumps(unifi_devices_resp, indent=2)}'])
|
||||||
'hostname': 'iPhone 12',
|
|
||||||
'vendor': 'Apple Inc.',
|
# --- Clients ---
|
||||||
'device_type': 'Smartphone',
|
clients_resp = api.get_clients(site_id)
|
||||||
'last_seen': '2024-06-27 10:00:00',
|
clients = clients_resp.get("data", [])
|
||||||
'port': '1',
|
mylog('verbose', [f'[{pluginName}] Site: {site_name} clients: {json.dumps(clients_resp, indent=2)}'])
|
||||||
'network_id': 'network1'
|
|
||||||
},
|
# Build a lookup for devices by their 'id' to find parent MAC easily
|
||||||
{
|
device_id_to_mac = {dev['id']: dev.get('macAddress', '') for dev in unifi_devices}
|
||||||
'device_id': 'device2',
|
|
||||||
'mac_address': '00:11:22:33:44:66',
|
# Helper to resolve uplinkDeviceId to parent MAC, or "Internet" if no uplink
|
||||||
'ip_address': '192.168.1.3',
|
def resolve_parent_mac(uplink_id):
|
||||||
'hostname': 'Moto G82',
|
if not uplink_id:
|
||||||
'vendor': 'Motorola Inc.',
|
return "Internet"
|
||||||
'device_type': 'Laptop',
|
return device_id_to_mac.get(uplink_id, "Unknown")
|
||||||
'last_seen': '2024-06-27 10:05:00',
|
|
||||||
'port': '',
|
# Process Unifi devices
|
||||||
'network_id': 'network1'
|
for device in unifi_devices:
|
||||||
}
|
dev_mac = device.get('macAddress', '')
|
||||||
]
|
dev_ip = device.get('ipAddress', '')
|
||||||
|
dev_name = device.get('name', '')
|
||||||
|
# Determine device_type based on features and type
|
||||||
|
# If device has "accessPoint" feature => type "AP"
|
||||||
|
# Else if "switching" feature => type "Switch"
|
||||||
|
# fallback to "Unknown"
|
||||||
|
features = device.get('features', [])
|
||||||
|
if 'accessPoint' in features:
|
||||||
|
device_type = 'AP'
|
||||||
|
elif 'switching' in features:
|
||||||
|
device_type = 'Switch'
|
||||||
|
else:
|
||||||
|
device_type = 'Unknown'
|
||||||
|
|
||||||
|
dev_type = device_type
|
||||||
|
# No connectedAt for devices, so empty
|
||||||
|
dev_connected = ''
|
||||||
|
|
||||||
|
uplinkDeviceId = device.get('uplinkDeviceId', '')
|
||||||
|
dev_parent_mac = resolve_parent_mac(uplinkDeviceId)
|
||||||
|
|
||||||
|
device_data.append({
|
||||||
|
"dev_mac": dev_mac,
|
||||||
|
"dev_ip": dev_ip,
|
||||||
|
"dev_name": dev_name,
|
||||||
|
"dev_type": dev_type,
|
||||||
|
"dev_connected": dev_connected,
|
||||||
|
"dev_parent_mac": dev_parent_mac
|
||||||
|
})
|
||||||
|
|
||||||
|
# Process Clients (child devices connected to APs or switches)
|
||||||
|
for client in clients:
|
||||||
|
dev_mac = client.get('macAddress', '')
|
||||||
|
dev_ip = client.get('ipAddress', '')
|
||||||
|
dev_name = client.get('name', '')
|
||||||
|
device_type = ""
|
||||||
|
|
||||||
|
dev_type = device_type
|
||||||
|
dev_connected = client.get('connectedAt', '')
|
||||||
|
|
||||||
|
uplinkDeviceId = client.get('uplinkDeviceId', '')
|
||||||
|
dev_parent_mac = resolve_parent_mac(uplinkDeviceId)
|
||||||
|
|
||||||
|
device_data.append({
|
||||||
|
"dev_mac": dev_mac,
|
||||||
|
"dev_ip": dev_ip,
|
||||||
|
"dev_name": dev_name,
|
||||||
|
"dev_type": dev_type,
|
||||||
|
"dev_connected": dev_connected,
|
||||||
|
"dev_parent_mac": dev_parent_mac
|
||||||
|
})
|
||||||
|
|
||||||
# Return the data to be detected by the main application
|
|
||||||
return device_data
|
return device_data
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
main()
|
main()
|
||||||
|
|||||||
Reference in New Issue
Block a user