From 3ee21ac830a288a63fcc3b56cdb1545d0c505da2 Mon Sep 17 00:00:00 2001 From: "Jokob @NetAlertX" <96159884+jokob-sk@users.noreply.github.com> Date: Wed, 21 Jan 2026 00:17:54 +0000 Subject: [PATCH] review fixes --- front/plugins/dig_scan/config.json | 3 +- front/plugins/luci_import/config.json | 126 +++++++++--------- front/plugins/pihole_scan/config.json | 63 --------- front/plugins/unifi_api_import/config.json | 2 - server/api_server/api_server_start.py | 10 +- server/api_server/openapi/schemas.py | 2 +- server/models/device_instance.py | 40 +++++- server/scan/device_handling.py | 7 + .../test_device_field_lock.py | 86 ++++++++---- 9 files changed, 169 insertions(+), 170 deletions(-) diff --git a/front/plugins/dig_scan/config.json b/front/plugins/dig_scan/config.json index 9405dc07..981d9d34 100755 --- a/front/plugins/dig_scan/config.json +++ b/front/plugins/dig_scan/config.json @@ -83,7 +83,8 @@ } ] }, - { "function": "SET_ALWAYS", + { + "function": "SET_ALWAYS", "type": { "dataType": "array", "elements": [ diff --git a/front/plugins/luci_import/config.json b/front/plugins/luci_import/config.json index 14939ef9..f70be1a6 100755 --- a/front/plugins/luci_import/config.json +++ b/front/plugins/luci_import/config.json @@ -383,7 +383,69 @@ "string": "Retrieve only devices that are reachable." } ] - } + }, + { + "function": "SET_ALWAYS", + "type": { + "dataType": "array", + "elements": [ + { + "elementType": "select", + "elementOptions": [{ "multiple": "true", "orderable": "true" }], + "transformers": [] + } + ] + }, + "default_value": ["devMac", "devLastIP"], + "options": [ + "devMac", + "devLastIP" + ], + "localized": ["name", "description"], + "name": [ + { + "language_code": "en_us", + "string": "Set always columns" + } + ], + "description": [ + { + "language_code": "en_us", + "string": "These columns are always overwritten by this plugin, unless the user locks or overwrites the value." + } + ] + }, + { + "function": "SET_EMPTY", + "type": { + "dataType": "array", + "elements": [ + { + "elementType": "select", + "elementOptions": [{ "multiple": "true", "orderable": "true" }], + "transformers": [] + } + ] + }, + "default_value": [], + "options": [ + "devMac", + "devLastIP" + ], + "localized": ["name", "description"], + "name": [ + { + "language_code": "en_us", + "string": "Set empty columns" + } + ], + "description": [ + { + "language_code": "en_us", + "string": "These columns are only overwritten if they are empty or set to NEWDEV." + } + ] + } ], "database_column_definitions": [ { @@ -568,68 +630,6 @@ "string": "Статус" } ] - }, - { - "function": "SET_ALWAYS", - "type": { - "dataType": "array", - "elements": [ - { - "elementType": "select", - "elementOptions": [{ "multiple": "true", "orderable": "true" }], - "transformers": [] - } - ] - }, - "default_value": ["devMac", "devLastIP"], - "options": [ - "devMac", - "devLastIP" - ], - "localized": ["name", "description"], - "name": [ - { - "language_code": "en_us", - "string": "Set always columns" - } - ], - "description": [ - { - "language_code": "en_us", - "string": "These columns are always overwritten by this plugin, unless the user locks or overwrites the value." - } - ] - }, - { - "function": "SET_EMPTY", - "type": { - "dataType": "array", - "elements": [ - { - "elementType": "select", - "elementOptions": [{ "multiple": "true", "orderable": "true" }], - "transformers": [] - } - ] - }, - "default_value": [], - "options": [ - "devMac", - "devLastIP" - ], - "localized": ["name", "description"], - "name": [ - { - "language_code": "en_us", - "string": "Set empty columns" - } - ], - "description": [ - { - "language_code": "en_us", - "string": "These columns are only overwritten if they are empty or set to NEWDEV." - } - ] } ] } \ No newline at end of file diff --git a/front/plugins/pihole_scan/config.json b/front/plugins/pihole_scan/config.json index cbdc8725..7f98770f 100755 --- a/front/plugins/pihole_scan/config.json +++ b/front/plugins/pihole_scan/config.json @@ -216,36 +216,6 @@ } ] }, - { - "function": "SET_ALWAYS", - "type": { - "dataType": "array", - "elements": [ - { - "elementType": "select", - "elementOptions": [{ "multiple": "true"}], - "transformers": [] - } - ] - }, - "default_value": ["devLastIP"], - "options": [ - "devLastIP" - ], - "localized": ["name", "description"], - "name": [ - { - "language_code": "en_us", - "string": "Set always columns" - } - ], - "description": [ - { - "language_code": "en_us", - "string": "These columns are always overwritten by this plugin, unless the user locks or overwrites the value." - } - ] - }, { "function": "SET_EMPTY", "type": { @@ -309,39 +279,6 @@ } ] }, - { - "function": "SET_EMPTY", - "type": { - "dataType": "array", - "elements": [ - { - "elementType": "select", - "elementOptions": [{ "multiple": "true"}], - "transformers": [] - } - ] - }, - "default_value": ["devName", "devVendor"], - "options": [ - "devMac", - "devLastIP", - "devName", - "devVendor" - ], - "localized": ["name", "description"], - "name": [ - { - "language_code": "en_us", - "string": "Set empty columns" - } - ], - "description": [ - { - "language_code": "en_us", - "string": "These columns are only overwritten if they are empty or set to NEWDEV." - } - ] - }, { "function": "WATCH", "type": { diff --git a/front/plugins/unifi_api_import/config.json b/front/plugins/unifi_api_import/config.json index 80b4abd4..1aedc822 100755 --- a/front/plugins/unifi_api_import/config.json +++ b/front/plugins/unifi_api_import/config.json @@ -40,8 +40,6 @@ } ], "params": [], - } - ], "settings": [ { "function": "RUN", diff --git a/server/api_server/api_server_start.py b/server/api_server/api_server_start.py index 69042030..3187596c 100755 --- a/server/api_server/api_server_start.py +++ b/server/api_server/api_server_start.py @@ -472,24 +472,24 @@ def api_device_field_lock(mac, payload=None): # Validate that the field can be locked source_field = field_name + "Source" allowed_tracked_fields = { - "devMac", "devName", "devLastIP", "devVendor", "devFQDN", + "devMac", "devName", "devLastIP", "devVendor", "devFQDN", "devSSID", "devParentMAC", "devParentPort", "devParentRelType", "devVlan" } if field_name not in allowed_tracked_fields: return jsonify({"success": False, "error": f"Field '{field_name}' cannot be locked"}), 400 device_handler = DeviceInstance() - + try: # When locking: set source to LOCKED # When unlocking: check current value and let plugins take over new_source = "LOCKED" if should_lock else "NEWDEV" result = device_handler.updateDeviceColumn(mac, source_field, new_source) - + if result.get("success"): action = "locked" if should_lock else "unlocked" return jsonify({ - "success": True, + "success": True, "message": f"Field {field_name} {action}", "fieldName": field_name, "locked": should_lock @@ -497,7 +497,7 @@ def api_device_field_lock(mac, payload=None): else: return jsonify(result), 400 except Exception as e: - mylog("error", f"Error locking field {field_name} for {mac}: {str(e)}") + mylog("none", f"Error locking field {field_name} for {mac}: {str(e)}") return jsonify({"success": False, "error": str(e)}), 500 diff --git a/server/api_server/openapi/schemas.py b/server/api_server/openapi/schemas.py index bbc3e369..086f03cf 100644 --- a/server/api_server/openapi/schemas.py +++ b/server/api_server/openapi/schemas.py @@ -275,7 +275,7 @@ class UpdateDeviceColumnRequest(BaseModel): class LockDeviceFieldRequest(BaseModel): """Request to lock/unlock a device field.""" - fieldName: str = Field(..., description="Field name to lock/unlock (devMac, devName, devLastIP, etc.)") + fieldName: Optional[str] = Field(None, description="Field name to lock/unlock (devMac, devName, devLastIP, etc.)") lock: bool = Field(True, description="True to lock the field, False to unlock") diff --git a/server/models/device_instance.py b/server/models/device_instance.py index fda2cfbf..90933e14 100755 --- a/server/models/device_instance.py +++ b/server/models/device_instance.py @@ -595,6 +595,30 @@ class DeviceInstance: cur.execute(sql, values) conn.commit() + if data.get("createNew", False): + # Initialize source-tracking fields on device creation. + # We always mark devMacSource as NEWDEV, and mark other tracked fields + # as NEWDEV only if the create payload provides a non-empty value. + initial_sources = {FIELD_SOURCE_MAP["devMac"]: "NEWDEV"} + for field_name, source_field in FIELD_SOURCE_MAP.items(): + if field_name == "devMac": + continue + field_value = data.get(field_name) + if field_value is None: + continue + if isinstance(field_value, str) and not field_value.strip(): + continue + initial_sources[source_field] = "NEWDEV" + + if initial_sources: + # Apply source updates in a single statement for the newly inserted row. + set_clause = ", ".join([f"{col}=?" for col in initial_sources.keys()]) + source_values = list(initial_sources.values()) + source_values.append(normalized_mac) + source_sql = f"UPDATE Devices SET {set_clause} WHERE devMac = ?" + cur.execute(source_sql, source_values) + conn.commit() + # Enforce source tracking on user updates # User-updated fields should have their *Source set to "USER" user_updated_fields = {k: v for k, v in data.items() if k in FIELD_SOURCE_MAP} @@ -683,26 +707,30 @@ class DeviceInstance: if field_name not in FIELD_SOURCE_MAP: return {"success": False, "error": f"Field {field_name} does not support locking"} + mac_normalized = normalize_mac(mac) + conn = get_temp_db_connection() try: - conn = get_temp_db_connection() - lock_field(mac, field_name, conn) - conn.close() + lock_field(mac_normalized, field_name, conn) return {"success": True, "message": f"Field {field_name} locked"} except Exception as e: return {"success": False, "error": str(e)} + finally: + conn.close() def unlockDeviceField(self, mac, field_name): """Unlock a device field so plugins can overwrite it again.""" if field_name not in FIELD_SOURCE_MAP: return {"success": False, "error": f"Field {field_name} does not support unlocking"} + mac_normalized = normalize_mac(mac) + conn = get_temp_db_connection() try: - conn = get_temp_db_connection() - unlock_field(mac, field_name, conn) - conn.close() + unlock_field(mac_normalized, field_name, conn) return {"success": True, "message": f"Field {field_name} unlocked"} except Exception as e: return {"success": False, "error": str(e)} + finally: + conn.close() def copyDevice(self, mac_from, mac_to): """Copy a device entry from one MAC to another.""" diff --git a/server/scan/device_handling.py b/server/scan/device_handling.py index c8278ac2..f7bf4155 100755 --- a/server/scan/device_handling.py +++ b/server/scan/device_handling.py @@ -9,6 +9,7 @@ from models.device_instance import DeviceInstance from scan.name_resolution import NameResolver from scan.device_heuristics import guess_icon, guess_type from db.db_helper import sanitize_SQL_input, list_to_where, safe_int +from helper import format_ip_long # Make sure log level is initialized correctly Logger(get_setting_value("LOG_LEVEL")) @@ -544,6 +545,12 @@ def create_new_devices(db): # Derive primary IP family values cur_IP = str(cur_IP).strip() if cur_IP else "" cur_IP_normalized = check_IP_format(cur_IP) if ":" not in cur_IP else cur_IP + + # Validate IPv6 addresses using format_ip_long for consistency + if ":" in cur_IP_normalized: + validated_ipv6 = format_ip_long(cur_IP_normalized) + cur_IP_normalized = validated_ipv6 if validated_ipv6 else "" + primary_ipv4 = cur_IP_normalized if cur_IP_normalized and ":" not in cur_IP_normalized else "" primary_ipv6 = cur_IP_normalized if cur_IP_normalized and ":" in cur_IP_normalized else "" diff --git a/test/authoritative_fields/test_device_field_lock.py b/test/authoritative_fields/test_device_field_lock.py index 7c2946d9..b1361fcc 100644 --- a/test/authoritative_fields/test_device_field_lock.py +++ b/test/authoritative_fields/test_device_field_lock.py @@ -46,16 +46,16 @@ def cleanup_test_device(test_mac): # Clean before test try: device_handler.deleteDeviceByMAC(test_mac) - except Exception: - pass - + except Exception as e: + pytest.fail(f"Pre-test cleanup failed for {test_mac}: {e}") + yield - + # Clean after test try: device_handler.deleteDeviceByMAC(test_mac) - except Exception: - pass + except Exception as e: + pytest.fail(f"Post-test cleanup failed for {test_mac}: {e}") class TestDeviceFieldLock: @@ -119,12 +119,12 @@ class TestDeviceFieldLock: """Lock each tracked field individually.""" # First create device self.test_create_test_device(client, test_mac, auth_headers) - + tracked_fields = [ "devMac", "devName", "devLastIP", "devVendor", "devFQDN", "devSSID", "devParentMAC", "devParentPort", "devParentRelType", "devVlan" ] - + for field_name in tracked_fields: payload = {"fieldName": field_name, "lock": True} resp = client.post( @@ -142,7 +142,7 @@ class TestDeviceFieldLock: """Lock a field then unlock it.""" # Create device self.test_create_test_device(client, test_mac, auth_headers) - + # Lock field lock_payload = {"fieldName": "devName", "lock": True} resp = client.post( @@ -152,13 +152,13 @@ class TestDeviceFieldLock: ) assert resp.status_code == 200 assert resp.json.get("locked") is True - + # Verify source is LOCKED resp = client.get(f"/device/{test_mac}", headers=auth_headers) assert resp.status_code == 200 device_data = resp.json assert device_data.get("devNameSource") == "LOCKED" - + # Unlock field unlock_payload = {"fieldName": "devName", "lock": False} resp = client.post( @@ -168,7 +168,7 @@ class TestDeviceFieldLock: ) assert resp.status_code == 200 assert resp.json.get("locked") is False - + # Verify source changed resp = client.get(f"/device/{test_mac}", headers=auth_headers) assert resp.status_code == 200 @@ -179,7 +179,7 @@ class TestDeviceFieldLock: """Locked field should not be updated through API.""" # Create device with initial name self.test_create_test_device(client, test_mac, auth_headers) - + # Lock the field lock_payload = {"fieldName": "devName", "lock": True} resp = client.post( @@ -188,7 +188,7 @@ class TestDeviceFieldLock: headers=auth_headers ) assert resp.status_code == 200 - + # Try to update the locked field update_payload = {"devName": "New Name"} resp = client.post( @@ -196,7 +196,7 @@ class TestDeviceFieldLock: json=update_payload, headers=auth_headers ) - + # Update should succeed at API level but authoritative handler should prevent it # The field update logic checks source in the database layer # For now verify the API accepts the request @@ -206,7 +206,7 @@ class TestDeviceFieldLock: """Lock some fields while leaving others unlocked.""" # Create device self.test_create_test_device(client, test_mac, auth_headers) - + # Lock only devName and devVendor for field in ["devName", "devVendor"]: payload = {"fieldName": field, "lock": True} @@ -216,16 +216,16 @@ class TestDeviceFieldLock: headers=auth_headers ) assert resp.status_code == 200 - + # Verify device state resp = client.get(f"/device/{test_mac}", headers=auth_headers) assert resp.status_code == 200 device_data = resp.json - + # Locked fields should have LOCKED source assert device_data.get("devNameSource") == "LOCKED" assert device_data.get("devVendorSource") == "LOCKED" - + # Other fields should not be locked assert device_data.get("devLastIPSource") != "LOCKED" assert device_data.get("devFQDNSource") != "LOCKED" @@ -234,9 +234,9 @@ class TestDeviceFieldLock: """Locking the same field multiple times should work.""" # Create device self.test_create_test_device(client, test_mac, auth_headers) - + payload = {"fieldName": "devName", "lock": True} - + # Lock once resp1 = client.post( f"/device/{test_mac}/field/lock", @@ -244,7 +244,7 @@ class TestDeviceFieldLock: headers=auth_headers ) assert resp1.status_code == 200 - + # Lock again resp2 = client.post( f"/device/{test_mac}/field/lock", @@ -269,10 +269,38 @@ class TestDeviceFieldLock: class TestFieldLockIntegration: """Integration tests for field locking with plugin overwrites.""" + def test_lock_unlock_normalizes_mac(self, test_mac): + """Lock/unlock should normalize MAC addresses before DB updates.""" + device_handler = DeviceInstance() + + create_result = device_handler.setDeviceData( + test_mac, + { + "devName": "Original Name", + "devLastIP": "192.168.1.100", + "createNew": True, + }, + ) + assert create_result.get("success") is True + + mac_variant = "aa-bb-cc-dd-ee-ff" + + lock_result = device_handler.lockDeviceField(mac_variant, "devName") + assert lock_result.get("success") is True + + device_data = device_handler.getDeviceData(test_mac) + assert device_data.get("devNameSource") == "LOCKED" + + unlock_result = device_handler.unlockDeviceField(mac_variant, "devName") + assert unlock_result.get("success") is True + + device_data = device_handler.getDeviceData(test_mac) + assert device_data.get("devNameSource") != "LOCKED" + def test_locked_field_blocks_plugin_overwrite(self, test_mac, auth_headers): """Verify locked fields prevent plugin source overwrites.""" device_handler = DeviceInstance() - + # Create device create_result = device_handler.setDeviceData(test_mac, { "devName": "Original Name", @@ -280,10 +308,10 @@ class TestFieldLockIntegration: "createNew": True }) assert create_result.get("success") is True - + # Lock the field device_handler.updateDeviceColumn(test_mac, "devNameSource", "LOCKED") - + # Try to overwrite with plugin source (this would be done by authoritative handler) # For now, verify the source is stored correctly device_data = device_handler.getDeviceData(test_mac) @@ -292,7 +320,7 @@ class TestFieldLockIntegration: def test_field_source_tracking(self, test_mac, auth_headers): """Verify field source is tracked correctly.""" device_handler = DeviceInstance() - + # Create device create_result = device_handler.setDeviceData(test_mac, { "devName": "Test Device", @@ -300,17 +328,17 @@ class TestFieldLockIntegration: "createNew": True }) assert create_result.get("success") is True - + # Verify initial source device_data = device_handler.getDeviceData(test_mac) assert device_data.get("devNameSource") == "NEWDEV" - + # Update field (should set source to USER) update_result = device_handler.setDeviceData(test_mac, { "devName": "Updated Name" }) assert update_result.get("success") is True - + # Verify source changed to USER device_data = device_handler.getDeviceData(test_mac) assert device_data.get("devNameSource") == "USER"