mirror of
https://github.com/jokob-sk/NetAlertX.git
synced 2025-12-06 17:15:38 -08:00
UNIFIAPI v0.4
This commit is contained in:
@@ -4,6 +4,7 @@ from pytz import timezone, all_timezones, UnknownTimeZoneError
|
||||
import sys
|
||||
import re
|
||||
import base64
|
||||
import json
|
||||
from datetime import datetime
|
||||
|
||||
INSTALL_PATH = "/app"
|
||||
@@ -116,6 +117,51 @@ def decodeBase64(inputParamBase64):
|
||||
|
||||
return result
|
||||
|
||||
# -------------------------------------------------------------------
|
||||
def decode_settings_base64(encoded_str, convert_types=True):
|
||||
"""
|
||||
Decodes a base64-encoded JSON list of settings into a dict.
|
||||
|
||||
Each setting entry format:
|
||||
[group, key, type, value]
|
||||
|
||||
Example:
|
||||
[
|
||||
["group", "name", "string", "Home - local"],
|
||||
["group", "base_url", "string", "https://..."],
|
||||
["group", "api_version", "integer", "2"],
|
||||
["group", "verify_ssl", "boolean", "False"]
|
||||
]
|
||||
|
||||
Returns:
|
||||
{
|
||||
"name": "Home - local",
|
||||
"base_url": "https://...",
|
||||
"api_version": 2,
|
||||
"verify_ssl": False
|
||||
}
|
||||
"""
|
||||
decoded_json = base64.b64decode(encoded_str).decode("utf-8")
|
||||
settings_list = json.loads(decoded_json)
|
||||
|
||||
settings_dict = {}
|
||||
for _, key, _type, value in settings_list:
|
||||
if convert_types:
|
||||
_type_lower = _type.lower()
|
||||
if _type_lower == "boolean":
|
||||
settings_dict[key] = value.lower() == "true"
|
||||
elif _type_lower == "integer":
|
||||
settings_dict[key] = int(value)
|
||||
elif _type_lower == "float":
|
||||
settings_dict[key] = float(value)
|
||||
else:
|
||||
settings_dict[key] = value
|
||||
else:
|
||||
settings_dict[key] = value
|
||||
|
||||
return settings_dict
|
||||
|
||||
|
||||
# -------------------------------------------------------------------
|
||||
def normalize_mac(mac):
|
||||
# Split the MAC address by colon (:) or hyphen (-) and convert each part to uppercase
|
||||
|
||||
@@ -573,24 +573,6 @@
|
||||
},
|
||||
{
|
||||
"column": "Watched_Value2",
|
||||
"mapped_to_column": "cur_Vendor",
|
||||
"css_classes": "col-sm-2",
|
||||
"show": true,
|
||||
"type": "label",
|
||||
"default_value": "",
|
||||
"options": [],
|
||||
"localized": [
|
||||
"name"
|
||||
],
|
||||
"name": [
|
||||
{
|
||||
"language_code": "en_us",
|
||||
"string": "Vendor"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"column": "Watched_Value3",
|
||||
"mapped_to_column": "cur_Type",
|
||||
"css_classes": "col-sm-2",
|
||||
"show": true,
|
||||
@@ -608,9 +590,9 @@
|
||||
]
|
||||
},
|
||||
{
|
||||
"column": "Watched_Value4",
|
||||
"column": "Watched_Value3",
|
||||
"css_classes": "col-sm-2",
|
||||
"show": false,
|
||||
"show": true,
|
||||
"type": "label",
|
||||
"default_value": "",
|
||||
"options": [],
|
||||
@@ -620,7 +602,25 @@
|
||||
"name": [
|
||||
{
|
||||
"language_code": "en_us",
|
||||
"string": "N/A"
|
||||
"string": "Connected"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"column": "Watched_Value4",
|
||||
"mapped_to_column": "cur_NetworkNodeMAC",
|
||||
"css_classes": "col-sm-2",
|
||||
"show": true,
|
||||
"type": "device_mac",
|
||||
"default_value": "",
|
||||
"options": [],
|
||||
"localized": [
|
||||
"name"
|
||||
],
|
||||
"name": [
|
||||
{
|
||||
"language_code": "en_us",
|
||||
"string": "Parent"
|
||||
}
|
||||
]
|
||||
},
|
||||
@@ -628,7 +628,7 @@
|
||||
"column": "Dummy",
|
||||
"mapped_to_column": "cur_ScanMethod",
|
||||
"mapped_to_column_data": {
|
||||
"value": "Example Plugin"
|
||||
"value": "UNIFIAPI"
|
||||
},
|
||||
"css_classes": "col-sm-2",
|
||||
"show": false,
|
||||
|
||||
@@ -6,12 +6,13 @@ import sys
|
||||
import json
|
||||
import sqlite3
|
||||
from pytz import timezone
|
||||
from unifi_sm_api.api import SiteManagerAPI
|
||||
|
||||
# Define the installation path and extend the system path for plugin imports
|
||||
INSTALL_PATH = "/app"
|
||||
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
|
||||
|
||||
from plugin_helper import Plugin_Object, Plugin_Objects, decodeBase64
|
||||
from plugin_helper import Plugin_Object, Plugin_Objects, decodeBase64, decode_settings_base64
|
||||
from plugin_utils import get_plugins_configs
|
||||
from logger import mylog, Logger
|
||||
from const import pluginsPath, fullDbPath, logPath
|
||||
@@ -26,7 +27,7 @@ conf.tz = timezone(get_setting_value('TIMEZONE'))
|
||||
# Make sure log level is initialized correctly
|
||||
Logger(get_setting_value('LOG_LEVEL'))
|
||||
|
||||
pluginName = '<unique_prefix>'
|
||||
pluginName = 'UNIFIAPI'
|
||||
|
||||
# Define the current path and log file paths
|
||||
LOG_PATH = logPath + '/plugins'
|
||||
@@ -42,83 +43,142 @@ def main():
|
||||
mylog('verbose', [f'[{pluginName}] In script'])
|
||||
|
||||
# Retrieve configuration settings
|
||||
some_setting = get_setting_value('SYNC_plugins')
|
||||
unifi_sites_configs = get_setting_value('UNIFIAPI_sites')
|
||||
|
||||
mylog('verbose', [f'[{pluginName}] some_setting value {some_setting}'])
|
||||
mylog('verbose', [f'[{pluginName}] number of unifi_sites_configs: {len(unifi_sites_configs)}'])
|
||||
|
||||
# retrieve data
|
||||
device_data = get_device_data(some_setting)
|
||||
|
||||
for site_config in unifi_sites_configs:
|
||||
|
||||
# Process the data into native application tables
|
||||
if len(device_data) > 0:
|
||||
siteDict = decode_settings_base64(site_config)
|
||||
|
||||
# insert devices into the lats_result.log
|
||||
# make sure the below mapping is mapped in config.json, for example:
|
||||
#"database_column_definitions": [
|
||||
# {
|
||||
# "column": "Object_PrimaryID", <--------- the value I save into primaryId
|
||||
# "mapped_to_column": "cur_MAC", <--------- gets inserted into the CurrentScan DB table column cur_MAC
|
||||
#
|
||||
for device in device_data:
|
||||
plugin_objects.add_object(
|
||||
primaryId = device['mac_address'],
|
||||
secondaryId = device['ip_address'],
|
||||
watched1 = device['hostname'],
|
||||
watched2 = device['vendor'],
|
||||
watched3 = device['device_type'],
|
||||
watched4 = device['last_seen'],
|
||||
extra = '',
|
||||
foreignKey = device['mac_address']
|
||||
# helpVal1 = "Something1", # Optional Helper values to be passed for mapping into the app
|
||||
# helpVal2 = "Something1", # If you need to use even only 1, add the remaining ones too
|
||||
# helpVal3 = "Something1", # and set them to 'null'. Check the the docs for details:
|
||||
# helpVal4 = "Something1", # https://github.com/jokob-sk/NetAlertX/blob/main/docs/PLUGINS_DEV.md
|
||||
)
|
||||
mylog('verbose', [f'[{pluginName}] siteDict: {json.dumps(siteDict)}'])
|
||||
mylog('none', [f'[{pluginName}] Connecting to: {siteDict["name"]}'])
|
||||
|
||||
mylog('verbose', [f'[{pluginName}] New entries: "{len(device_data)}"'])
|
||||
api = SiteManagerAPI(
|
||||
api_key=siteDict["api_key"],
|
||||
version=siteDict["api_version"],
|
||||
base_url=siteDict["base_url"],
|
||||
verify_ssl=siteDict["verify_ssl"]
|
||||
)
|
||||
|
||||
# log result
|
||||
plugin_objects.write_result_file()
|
||||
sites_resp = api.get_sites()
|
||||
sites = sites_resp.get("data", [])
|
||||
|
||||
for site in sites:
|
||||
|
||||
# retrieve data
|
||||
device_data = get_device_data(site, api)
|
||||
|
||||
# Process the data into native application tables
|
||||
if len(device_data) > 0:
|
||||
|
||||
# insert devices into the lats_result.log
|
||||
for device in device_data:
|
||||
plugin_objects.add_object(
|
||||
primaryId = device['dev_mac'], # mac
|
||||
secondaryId = device['dev_ip'], # IP
|
||||
watched1 = device['dev_name'], # name
|
||||
watched2 = device['dev_type'], # device_type (AP/Switch etc)
|
||||
watched3 = device['dev_connected'], # connectedAt or empty
|
||||
watched4 = device['dev_parent_mac'],# parent_mac or "Internet"
|
||||
extra = '',
|
||||
foreignKey = device['dev_mac']
|
||||
)
|
||||
|
||||
mylog('verbose', [f'[{pluginName}] New entries: "{len(device_data)}"'])
|
||||
|
||||
# log result
|
||||
plugin_objects.write_result_file()
|
||||
|
||||
return 0
|
||||
|
||||
# retrieve data
|
||||
def get_device_data(some_setting):
|
||||
|
||||
def get_device_data(site, api):
|
||||
device_data = []
|
||||
|
||||
# do some processing, call exteranl APIs, and return a device_data list
|
||||
# ...
|
||||
#
|
||||
# Sample data for testing purposes, you can adjust the processing in main() as needed
|
||||
# ... before adding it to the plugin_objects.add_object(...)
|
||||
device_data = [
|
||||
{
|
||||
'device_id': 'device1',
|
||||
'mac_address': '00:11:22:33:44:55',
|
||||
'ip_address': '192.168.1.2',
|
||||
'hostname': 'iPhone 12',
|
||||
'vendor': 'Apple Inc.',
|
||||
'device_type': 'Smartphone',
|
||||
'last_seen': '2024-06-27 10:00:00',
|
||||
'port': '1',
|
||||
'network_id': 'network1'
|
||||
},
|
||||
{
|
||||
'device_id': 'device2',
|
||||
'mac_address': '00:11:22:33:44:66',
|
||||
'ip_address': '192.168.1.3',
|
||||
'hostname': 'Moto G82',
|
||||
'vendor': 'Motorola Inc.',
|
||||
'device_type': 'Laptop',
|
||||
'last_seen': '2024-06-27 10:05:00',
|
||||
'port': '',
|
||||
'network_id': 'network1'
|
||||
}
|
||||
]
|
||||
mylog('verbose', [f'[{pluginName}] Site: {site} '])
|
||||
site_id = site["id"]
|
||||
site_name = site.get("name", "Unnamed Site")
|
||||
|
||||
mylog('verbose', [f'[{pluginName}] Site: {site_name} ({site_id})'])
|
||||
|
||||
# --- Devices ---
|
||||
unifi_devices_resp = api.get_unifi_devices(site_id)
|
||||
unifi_devices = unifi_devices_resp.get("data", [])
|
||||
mylog('verbose', [f'[{pluginName}] Site: {site_name} unifi devices: {json.dumps(unifi_devices_resp, indent=2)}'])
|
||||
|
||||
# --- Clients ---
|
||||
clients_resp = api.get_clients(site_id)
|
||||
clients = clients_resp.get("data", [])
|
||||
mylog('verbose', [f'[{pluginName}] Site: {site_name} clients: {json.dumps(clients_resp, indent=2)}'])
|
||||
|
||||
# Build a lookup for devices by their 'id' to find parent MAC easily
|
||||
device_id_to_mac = {dev['id']: dev.get('macAddress', '') for dev in unifi_devices}
|
||||
|
||||
# Helper to resolve uplinkDeviceId to parent MAC, or "Internet" if no uplink
|
||||
def resolve_parent_mac(uplink_id):
|
||||
if not uplink_id:
|
||||
return "Internet"
|
||||
return device_id_to_mac.get(uplink_id, "Unknown")
|
||||
|
||||
# Process Unifi devices
|
||||
for device in unifi_devices:
|
||||
dev_mac = device.get('macAddress', '')
|
||||
dev_ip = device.get('ipAddress', '')
|
||||
dev_name = device.get('name', '')
|
||||
# Determine device_type based on features and type
|
||||
# If device has "accessPoint" feature => type "AP"
|
||||
# Else if "switching" feature => type "Switch"
|
||||
# fallback to "Unknown"
|
||||
features = device.get('features', [])
|
||||
if 'accessPoint' in features:
|
||||
device_type = 'AP'
|
||||
elif 'switching' in features:
|
||||
device_type = 'Switch'
|
||||
else:
|
||||
device_type = 'Unknown'
|
||||
|
||||
dev_type = device_type
|
||||
# No connectedAt for devices, so empty
|
||||
dev_connected = ''
|
||||
|
||||
uplinkDeviceId = device.get('uplinkDeviceId', '')
|
||||
dev_parent_mac = resolve_parent_mac(uplinkDeviceId)
|
||||
|
||||
device_data.append({
|
||||
"dev_mac": dev_mac,
|
||||
"dev_ip": dev_ip,
|
||||
"dev_name": dev_name,
|
||||
"dev_type": dev_type,
|
||||
"dev_connected": dev_connected,
|
||||
"dev_parent_mac": dev_parent_mac
|
||||
})
|
||||
|
||||
# Process Clients (child devices connected to APs or switches)
|
||||
for client in clients:
|
||||
dev_mac = client.get('macAddress', '')
|
||||
dev_ip = client.get('ipAddress', '')
|
||||
dev_name = client.get('name', '')
|
||||
device_type = ""
|
||||
|
||||
dev_type = device_type
|
||||
dev_connected = client.get('connectedAt', '')
|
||||
|
||||
uplinkDeviceId = client.get('uplinkDeviceId', '')
|
||||
dev_parent_mac = resolve_parent_mac(uplinkDeviceId)
|
||||
|
||||
device_data.append({
|
||||
"dev_mac": dev_mac,
|
||||
"dev_ip": dev_ip,
|
||||
"dev_name": dev_name,
|
||||
"dev_type": dev_type,
|
||||
"dev_connected": dev_connected,
|
||||
"dev_parent_mac": dev_parent_mac
|
||||
})
|
||||
|
||||
# Return the data to be detected by the main application
|
||||
return device_data
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
||||
Reference in New Issue
Block a user