diff --git a/front/plugins/plugin_helper.py b/front/plugins/plugin_helper.py index e3c7b632..ceb9cd8b 100755 --- a/front/plugins/plugin_helper.py +++ b/front/plugins/plugin_helper.py @@ -4,6 +4,7 @@ from pytz import timezone, all_timezones, UnknownTimeZoneError import sys import re import base64 +import json from datetime import datetime INSTALL_PATH = "/app" @@ -116,6 +117,51 @@ def decodeBase64(inputParamBase64): return result +# ------------------------------------------------------------------- +def decode_settings_base64(encoded_str, convert_types=True): + """ + Decodes a base64-encoded JSON list of settings into a dict. + + Each setting entry format: + [group, key, type, value] + + Example: + [ + ["group", "name", "string", "Home - local"], + ["group", "base_url", "string", "https://..."], + ["group", "api_version", "integer", "2"], + ["group", "verify_ssl", "boolean", "False"] + ] + + Returns: + { + "name": "Home - local", + "base_url": "https://...", + "api_version": 2, + "verify_ssl": False + } + """ + decoded_json = base64.b64decode(encoded_str).decode("utf-8") + settings_list = json.loads(decoded_json) + + settings_dict = {} + for _, key, _type, value in settings_list: + if convert_types: + _type_lower = _type.lower() + if _type_lower == "boolean": + settings_dict[key] = value.lower() == "true" + elif _type_lower == "integer": + settings_dict[key] = int(value) + elif _type_lower == "float": + settings_dict[key] = float(value) + else: + settings_dict[key] = value + else: + settings_dict[key] = value + + return settings_dict + + # ------------------------------------------------------------------- def normalize_mac(mac): # Split the MAC address by colon (:) or hyphen (-) and convert each part to uppercase diff --git a/front/plugins/unifi_api_import/config.json b/front/plugins/unifi_api_import/config.json index bd3ef1f7..0e6bcd40 100755 --- a/front/plugins/unifi_api_import/config.json +++ b/front/plugins/unifi_api_import/config.json @@ -573,24 +573,6 @@ }, { "column": "Watched_Value2", - "mapped_to_column": "cur_Vendor", - "css_classes": "col-sm-2", - "show": true, - "type": "label", - "default_value": "", - "options": [], - "localized": [ - "name" - ], - "name": [ - { - "language_code": "en_us", - "string": "Vendor" - } - ] - }, - { - "column": "Watched_Value3", "mapped_to_column": "cur_Type", "css_classes": "col-sm-2", "show": true, @@ -608,9 +590,9 @@ ] }, { - "column": "Watched_Value4", + "column": "Watched_Value3", "css_classes": "col-sm-2", - "show": false, + "show": true, "type": "label", "default_value": "", "options": [], @@ -620,7 +602,25 @@ "name": [ { "language_code": "en_us", - "string": "N/A" + "string": "Connected" + } + ] + }, + { + "column": "Watched_Value4", + "mapped_to_column": "cur_NetworkNodeMAC", + "css_classes": "col-sm-2", + "show": true, + "type": "device_mac", + "default_value": "", + "options": [], + "localized": [ + "name" + ], + "name": [ + { + "language_code": "en_us", + "string": "Parent" } ] }, @@ -628,7 +628,7 @@ "column": "Dummy", "mapped_to_column": "cur_ScanMethod", "mapped_to_column_data": { - "value": "Example Plugin" + "value": "UNIFIAPI" }, "css_classes": "col-sm-2", "show": false, diff --git a/front/plugins/unifi_api_import/unifi_api_import.py b/front/plugins/unifi_api_import/unifi_api_import.py index 49f2a760..b9c48c5a 100755 --- a/front/plugins/unifi_api_import/unifi_api_import.py +++ b/front/plugins/unifi_api_import/unifi_api_import.py @@ -6,12 +6,13 @@ import sys import json import sqlite3 from pytz import timezone +from unifi_sm_api.api import SiteManagerAPI # Define the installation path and extend the system path for plugin imports INSTALL_PATH = "/app" sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"]) -from plugin_helper import Plugin_Object, Plugin_Objects, decodeBase64 +from plugin_helper import Plugin_Object, Plugin_Objects, decodeBase64, decode_settings_base64 from plugin_utils import get_plugins_configs from logger import mylog, Logger from const import pluginsPath, fullDbPath, logPath @@ -26,7 +27,7 @@ conf.tz = timezone(get_setting_value('TIMEZONE')) # Make sure log level is initialized correctly Logger(get_setting_value('LOG_LEVEL')) -pluginName = '' +pluginName = 'UNIFIAPI' # Define the current path and log file paths LOG_PATH = logPath + '/plugins' @@ -42,83 +43,142 @@ def main(): mylog('verbose', [f'[{pluginName}] In script']) # Retrieve configuration settings - some_setting = get_setting_value('SYNC_plugins') + unifi_sites_configs = get_setting_value('UNIFIAPI_sites') - mylog('verbose', [f'[{pluginName}] some_setting value {some_setting}']) + mylog('verbose', [f'[{pluginName}] number of unifi_sites_configs: {len(unifi_sites_configs)}']) - # retrieve data - device_data = get_device_data(some_setting) + + for site_config in unifi_sites_configs: - # Process the data into native application tables - if len(device_data) > 0: + siteDict = decode_settings_base64(site_config) - # insert devices into the lats_result.log - # make sure the below mapping is mapped in config.json, for example: - #"database_column_definitions": [ - # { - # "column": "Object_PrimaryID", <--------- the value I save into primaryId - # "mapped_to_column": "cur_MAC", <--------- gets inserted into the CurrentScan DB table column cur_MAC - # - for device in device_data: - plugin_objects.add_object( - primaryId = device['mac_address'], - secondaryId = device['ip_address'], - watched1 = device['hostname'], - watched2 = device['vendor'], - watched3 = device['device_type'], - watched4 = device['last_seen'], - extra = '', - foreignKey = device['mac_address'] - # helpVal1 = "Something1", # Optional Helper values to be passed for mapping into the app - # helpVal2 = "Something1", # If you need to use even only 1, add the remaining ones too - # helpVal3 = "Something1", # and set them to 'null'. Check the the docs for details: - # helpVal4 = "Something1", # https://github.com/jokob-sk/NetAlertX/blob/main/docs/PLUGINS_DEV.md - ) + mylog('verbose', [f'[{pluginName}] siteDict: {json.dumps(siteDict)}']) + mylog('none', [f'[{pluginName}] Connecting to: {siteDict["name"]}']) - mylog('verbose', [f'[{pluginName}] New entries: "{len(device_data)}"']) + api = SiteManagerAPI( + api_key=siteDict["api_key"], + version=siteDict["api_version"], + base_url=siteDict["base_url"], + verify_ssl=siteDict["verify_ssl"] + ) - # log result - plugin_objects.write_result_file() + sites_resp = api.get_sites() + sites = sites_resp.get("data", []) + + for site in sites: + + # retrieve data + device_data = get_device_data(site, api) + + # Process the data into native application tables + if len(device_data) > 0: + + # insert devices into the lats_result.log + for device in device_data: + plugin_objects.add_object( + primaryId = device['dev_mac'], # mac + secondaryId = device['dev_ip'], # IP + watched1 = device['dev_name'], # name + watched2 = device['dev_type'], # device_type (AP/Switch etc) + watched3 = device['dev_connected'], # connectedAt or empty + watched4 = device['dev_parent_mac'],# parent_mac or "Internet" + extra = '', + foreignKey = device['dev_mac'] + ) + + mylog('verbose', [f'[{pluginName}] New entries: "{len(device_data)}"']) + + # log result + plugin_objects.write_result_file() return 0 # retrieve data -def get_device_data(some_setting): - +def get_device_data(site, api): device_data = [] - # do some processing, call exteranl APIs, and return a device_data list - # ... - # - # Sample data for testing purposes, you can adjust the processing in main() as needed - # ... before adding it to the plugin_objects.add_object(...) - device_data = [ - { - 'device_id': 'device1', - 'mac_address': '00:11:22:33:44:55', - 'ip_address': '192.168.1.2', - 'hostname': 'iPhone 12', - 'vendor': 'Apple Inc.', - 'device_type': 'Smartphone', - 'last_seen': '2024-06-27 10:00:00', - 'port': '1', - 'network_id': 'network1' - }, - { - 'device_id': 'device2', - 'mac_address': '00:11:22:33:44:66', - 'ip_address': '192.168.1.3', - 'hostname': 'Moto G82', - 'vendor': 'Motorola Inc.', - 'device_type': 'Laptop', - 'last_seen': '2024-06-27 10:05:00', - 'port': '', - 'network_id': 'network1' - } - ] + mylog('verbose', [f'[{pluginName}] Site: {site} ']) + site_id = site["id"] + site_name = site.get("name", "Unnamed Site") + + mylog('verbose', [f'[{pluginName}] Site: {site_name} ({site_id})']) + + # --- Devices --- + unifi_devices_resp = api.get_unifi_devices(site_id) + unifi_devices = unifi_devices_resp.get("data", []) + mylog('verbose', [f'[{pluginName}] Site: {site_name} unifi devices: {json.dumps(unifi_devices_resp, indent=2)}']) + + # --- Clients --- + clients_resp = api.get_clients(site_id) + clients = clients_resp.get("data", []) + mylog('verbose', [f'[{pluginName}] Site: {site_name} clients: {json.dumps(clients_resp, indent=2)}']) + + # Build a lookup for devices by their 'id' to find parent MAC easily + device_id_to_mac = {dev['id']: dev.get('macAddress', '') for dev in unifi_devices} + + # Helper to resolve uplinkDeviceId to parent MAC, or "Internet" if no uplink + def resolve_parent_mac(uplink_id): + if not uplink_id: + return "Internet" + return device_id_to_mac.get(uplink_id, "Unknown") + + # Process Unifi devices + for device in unifi_devices: + dev_mac = device.get('macAddress', '') + dev_ip = device.get('ipAddress', '') + dev_name = device.get('name', '') + # Determine device_type based on features and type + # If device has "accessPoint" feature => type "AP" + # Else if "switching" feature => type "Switch" + # fallback to "Unknown" + features = device.get('features', []) + if 'accessPoint' in features: + device_type = 'AP' + elif 'switching' in features: + device_type = 'Switch' + else: + device_type = 'Unknown' + + dev_type = device_type + # No connectedAt for devices, so empty + dev_connected = '' + + uplinkDeviceId = device.get('uplinkDeviceId', '') + dev_parent_mac = resolve_parent_mac(uplinkDeviceId) + + device_data.append({ + "dev_mac": dev_mac, + "dev_ip": dev_ip, + "dev_name": dev_name, + "dev_type": dev_type, + "dev_connected": dev_connected, + "dev_parent_mac": dev_parent_mac + }) + + # Process Clients (child devices connected to APs or switches) + for client in clients: + dev_mac = client.get('macAddress', '') + dev_ip = client.get('ipAddress', '') + dev_name = client.get('name', '') + device_type = "" + + dev_type = device_type + dev_connected = client.get('connectedAt', '') + + uplinkDeviceId = client.get('uplinkDeviceId', '') + dev_parent_mac = resolve_parent_mac(uplinkDeviceId) + + device_data.append({ + "dev_mac": dev_mac, + "dev_ip": dev_ip, + "dev_name": dev_name, + "dev_type": dev_type, + "dev_connected": dev_connected, + "dev_parent_mac": dev_parent_mac + }) - # Return the data to be detected by the main application return device_data + if __name__ == '__main__': main()