review fixes

This commit is contained in:
Jokob @NetAlertX
2026-01-21 00:17:54 +00:00
parent 22695a633c
commit 3ee21ac830
9 changed files with 169 additions and 170 deletions

View File

@@ -83,7 +83,8 @@
} }
] ]
}, },
{ "function": "SET_ALWAYS", {
"function": "SET_ALWAYS",
"type": { "type": {
"dataType": "array", "dataType": "array",
"elements": [ "elements": [

View File

@@ -383,7 +383,69 @@
"string": "Retrieve only devices that are reachable." "string": "Retrieve only devices that are reachable."
} }
] ]
} },
{
"function": "SET_ALWAYS",
"type": {
"dataType": "array",
"elements": [
{
"elementType": "select",
"elementOptions": [{ "multiple": "true", "orderable": "true" }],
"transformers": []
}
]
},
"default_value": ["devMac", "devLastIP"],
"options": [
"devMac",
"devLastIP"
],
"localized": ["name", "description"],
"name": [
{
"language_code": "en_us",
"string": "Set always columns"
}
],
"description": [
{
"language_code": "en_us",
"string": "These columns are always overwritten by this plugin, unless the user locks or overwrites the value."
}
]
},
{
"function": "SET_EMPTY",
"type": {
"dataType": "array",
"elements": [
{
"elementType": "select",
"elementOptions": [{ "multiple": "true", "orderable": "true" }],
"transformers": []
}
]
},
"default_value": [],
"options": [
"devMac",
"devLastIP"
],
"localized": ["name", "description"],
"name": [
{
"language_code": "en_us",
"string": "Set empty columns"
}
],
"description": [
{
"language_code": "en_us",
"string": "These columns are only overwritten if they are empty or set to NEWDEV."
}
]
}
], ],
"database_column_definitions": [ "database_column_definitions": [
{ {
@@ -568,68 +630,6 @@
"string": "Статус" "string": "Статус"
} }
] ]
},
{
"function": "SET_ALWAYS",
"type": {
"dataType": "array",
"elements": [
{
"elementType": "select",
"elementOptions": [{ "multiple": "true", "orderable": "true" }],
"transformers": []
}
]
},
"default_value": ["devMac", "devLastIP"],
"options": [
"devMac",
"devLastIP"
],
"localized": ["name", "description"],
"name": [
{
"language_code": "en_us",
"string": "Set always columns"
}
],
"description": [
{
"language_code": "en_us",
"string": "These columns are always overwritten by this plugin, unless the user locks or overwrites the value."
}
]
},
{
"function": "SET_EMPTY",
"type": {
"dataType": "array",
"elements": [
{
"elementType": "select",
"elementOptions": [{ "multiple": "true", "orderable": "true" }],
"transformers": []
}
]
},
"default_value": [],
"options": [
"devMac",
"devLastIP"
],
"localized": ["name", "description"],
"name": [
{
"language_code": "en_us",
"string": "Set empty columns"
}
],
"description": [
{
"language_code": "en_us",
"string": "These columns are only overwritten if they are empty or set to NEWDEV."
}
]
} }
] ]
} }

View File

@@ -216,36 +216,6 @@
} }
] ]
}, },
{
"function": "SET_ALWAYS",
"type": {
"dataType": "array",
"elements": [
{
"elementType": "select",
"elementOptions": [{ "multiple": "true"}],
"transformers": []
}
]
},
"default_value": ["devLastIP"],
"options": [
"devLastIP"
],
"localized": ["name", "description"],
"name": [
{
"language_code": "en_us",
"string": "Set always columns"
}
],
"description": [
{
"language_code": "en_us",
"string": "These columns are always overwritten by this plugin, unless the user locks or overwrites the value."
}
]
},
{ {
"function": "SET_EMPTY", "function": "SET_EMPTY",
"type": { "type": {
@@ -309,39 +279,6 @@
} }
] ]
}, },
{
"function": "SET_EMPTY",
"type": {
"dataType": "array",
"elements": [
{
"elementType": "select",
"elementOptions": [{ "multiple": "true"}],
"transformers": []
}
]
},
"default_value": ["devName", "devVendor"],
"options": [
"devMac",
"devLastIP",
"devName",
"devVendor"
],
"localized": ["name", "description"],
"name": [
{
"language_code": "en_us",
"string": "Set empty columns"
}
],
"description": [
{
"language_code": "en_us",
"string": "These columns are only overwritten if they are empty or set to NEWDEV."
}
]
},
{ {
"function": "WATCH", "function": "WATCH",
"type": { "type": {

View File

@@ -40,8 +40,6 @@
} }
], ],
"params": [], "params": [],
}
],
"settings": [ "settings": [
{ {
"function": "RUN", "function": "RUN",

View File

@@ -472,24 +472,24 @@ def api_device_field_lock(mac, payload=None):
# Validate that the field can be locked # Validate that the field can be locked
source_field = field_name + "Source" source_field = field_name + "Source"
allowed_tracked_fields = { allowed_tracked_fields = {
"devMac", "devName", "devLastIP", "devVendor", "devFQDN", "devMac", "devName", "devLastIP", "devVendor", "devFQDN",
"devSSID", "devParentMAC", "devParentPort", "devParentRelType", "devVlan" "devSSID", "devParentMAC", "devParentPort", "devParentRelType", "devVlan"
} }
if field_name not in allowed_tracked_fields: if field_name not in allowed_tracked_fields:
return jsonify({"success": False, "error": f"Field '{field_name}' cannot be locked"}), 400 return jsonify({"success": False, "error": f"Field '{field_name}' cannot be locked"}), 400
device_handler = DeviceInstance() device_handler = DeviceInstance()
try: try:
# When locking: set source to LOCKED # When locking: set source to LOCKED
# When unlocking: check current value and let plugins take over # When unlocking: check current value and let plugins take over
new_source = "LOCKED" if should_lock else "NEWDEV" new_source = "LOCKED" if should_lock else "NEWDEV"
result = device_handler.updateDeviceColumn(mac, source_field, new_source) result = device_handler.updateDeviceColumn(mac, source_field, new_source)
if result.get("success"): if result.get("success"):
action = "locked" if should_lock else "unlocked" action = "locked" if should_lock else "unlocked"
return jsonify({ return jsonify({
"success": True, "success": True,
"message": f"Field {field_name} {action}", "message": f"Field {field_name} {action}",
"fieldName": field_name, "fieldName": field_name,
"locked": should_lock "locked": should_lock
@@ -497,7 +497,7 @@ def api_device_field_lock(mac, payload=None):
else: else:
return jsonify(result), 400 return jsonify(result), 400
except Exception as e: except Exception as e:
mylog("error", f"Error locking field {field_name} for {mac}: {str(e)}") mylog("none", f"Error locking field {field_name} for {mac}: {str(e)}")
return jsonify({"success": False, "error": str(e)}), 500 return jsonify({"success": False, "error": str(e)}), 500

View File

@@ -275,7 +275,7 @@ class UpdateDeviceColumnRequest(BaseModel):
class LockDeviceFieldRequest(BaseModel): class LockDeviceFieldRequest(BaseModel):
"""Request to lock/unlock a device field.""" """Request to lock/unlock a device field."""
fieldName: str = Field(..., description="Field name to lock/unlock (devMac, devName, devLastIP, etc.)") fieldName: Optional[str] = Field(None, description="Field name to lock/unlock (devMac, devName, devLastIP, etc.)")
lock: bool = Field(True, description="True to lock the field, False to unlock") lock: bool = Field(True, description="True to lock the field, False to unlock")

View File

@@ -595,6 +595,30 @@ class DeviceInstance:
cur.execute(sql, values) cur.execute(sql, values)
conn.commit() conn.commit()
if data.get("createNew", False):
# Initialize source-tracking fields on device creation.
# We always mark devMacSource as NEWDEV, and mark other tracked fields
# as NEWDEV only if the create payload provides a non-empty value.
initial_sources = {FIELD_SOURCE_MAP["devMac"]: "NEWDEV"}
for field_name, source_field in FIELD_SOURCE_MAP.items():
if field_name == "devMac":
continue
field_value = data.get(field_name)
if field_value is None:
continue
if isinstance(field_value, str) and not field_value.strip():
continue
initial_sources[source_field] = "NEWDEV"
if initial_sources:
# Apply source updates in a single statement for the newly inserted row.
set_clause = ", ".join([f"{col}=?" for col in initial_sources.keys()])
source_values = list(initial_sources.values())
source_values.append(normalized_mac)
source_sql = f"UPDATE Devices SET {set_clause} WHERE devMac = ?"
cur.execute(source_sql, source_values)
conn.commit()
# Enforce source tracking on user updates # Enforce source tracking on user updates
# User-updated fields should have their *Source set to "USER" # User-updated fields should have their *Source set to "USER"
user_updated_fields = {k: v for k, v in data.items() if k in FIELD_SOURCE_MAP} user_updated_fields = {k: v for k, v in data.items() if k in FIELD_SOURCE_MAP}
@@ -683,26 +707,30 @@ class DeviceInstance:
if field_name not in FIELD_SOURCE_MAP: if field_name not in FIELD_SOURCE_MAP:
return {"success": False, "error": f"Field {field_name} does not support locking"} return {"success": False, "error": f"Field {field_name} does not support locking"}
mac_normalized = normalize_mac(mac)
conn = get_temp_db_connection()
try: try:
conn = get_temp_db_connection() lock_field(mac_normalized, field_name, conn)
lock_field(mac, field_name, conn)
conn.close()
return {"success": True, "message": f"Field {field_name} locked"} return {"success": True, "message": f"Field {field_name} locked"}
except Exception as e: except Exception as e:
return {"success": False, "error": str(e)} return {"success": False, "error": str(e)}
finally:
conn.close()
def unlockDeviceField(self, mac, field_name): def unlockDeviceField(self, mac, field_name):
"""Unlock a device field so plugins can overwrite it again.""" """Unlock a device field so plugins can overwrite it again."""
if field_name not in FIELD_SOURCE_MAP: if field_name not in FIELD_SOURCE_MAP:
return {"success": False, "error": f"Field {field_name} does not support unlocking"} return {"success": False, "error": f"Field {field_name} does not support unlocking"}
mac_normalized = normalize_mac(mac)
conn = get_temp_db_connection()
try: try:
conn = get_temp_db_connection() unlock_field(mac_normalized, field_name, conn)
unlock_field(mac, field_name, conn)
conn.close()
return {"success": True, "message": f"Field {field_name} unlocked"} return {"success": True, "message": f"Field {field_name} unlocked"}
except Exception as e: except Exception as e:
return {"success": False, "error": str(e)} return {"success": False, "error": str(e)}
finally:
conn.close()
def copyDevice(self, mac_from, mac_to): def copyDevice(self, mac_from, mac_to):
"""Copy a device entry from one MAC to another.""" """Copy a device entry from one MAC to another."""

View File

@@ -9,6 +9,7 @@ from models.device_instance import DeviceInstance
from scan.name_resolution import NameResolver from scan.name_resolution import NameResolver
from scan.device_heuristics import guess_icon, guess_type from scan.device_heuristics import guess_icon, guess_type
from db.db_helper import sanitize_SQL_input, list_to_where, safe_int from db.db_helper import sanitize_SQL_input, list_to_where, safe_int
from helper import format_ip_long
# Make sure log level is initialized correctly # Make sure log level is initialized correctly
Logger(get_setting_value("LOG_LEVEL")) Logger(get_setting_value("LOG_LEVEL"))
@@ -544,6 +545,12 @@ def create_new_devices(db):
# Derive primary IP family values # Derive primary IP family values
cur_IP = str(cur_IP).strip() if cur_IP else "" cur_IP = str(cur_IP).strip() if cur_IP else ""
cur_IP_normalized = check_IP_format(cur_IP) if ":" not in cur_IP else cur_IP cur_IP_normalized = check_IP_format(cur_IP) if ":" not in cur_IP else cur_IP
# Validate IPv6 addresses using format_ip_long for consistency
if ":" in cur_IP_normalized:
validated_ipv6 = format_ip_long(cur_IP_normalized)
cur_IP_normalized = validated_ipv6 if validated_ipv6 else ""
primary_ipv4 = cur_IP_normalized if cur_IP_normalized and ":" not in cur_IP_normalized else "" primary_ipv4 = cur_IP_normalized if cur_IP_normalized and ":" not in cur_IP_normalized else ""
primary_ipv6 = cur_IP_normalized if cur_IP_normalized and ":" in cur_IP_normalized else "" primary_ipv6 = cur_IP_normalized if cur_IP_normalized and ":" in cur_IP_normalized else ""

View File

@@ -46,16 +46,16 @@ def cleanup_test_device(test_mac):
# Clean before test # Clean before test
try: try:
device_handler.deleteDeviceByMAC(test_mac) device_handler.deleteDeviceByMAC(test_mac)
except Exception: except Exception as e:
pass pytest.fail(f"Pre-test cleanup failed for {test_mac}: {e}")
yield yield
# Clean after test # Clean after test
try: try:
device_handler.deleteDeviceByMAC(test_mac) device_handler.deleteDeviceByMAC(test_mac)
except Exception: except Exception as e:
pass pytest.fail(f"Post-test cleanup failed for {test_mac}: {e}")
class TestDeviceFieldLock: class TestDeviceFieldLock:
@@ -119,12 +119,12 @@ class TestDeviceFieldLock:
"""Lock each tracked field individually.""" """Lock each tracked field individually."""
# First create device # First create device
self.test_create_test_device(client, test_mac, auth_headers) self.test_create_test_device(client, test_mac, auth_headers)
tracked_fields = [ tracked_fields = [
"devMac", "devName", "devLastIP", "devVendor", "devFQDN", "devMac", "devName", "devLastIP", "devVendor", "devFQDN",
"devSSID", "devParentMAC", "devParentPort", "devParentRelType", "devVlan" "devSSID", "devParentMAC", "devParentPort", "devParentRelType", "devVlan"
] ]
for field_name in tracked_fields: for field_name in tracked_fields:
payload = {"fieldName": field_name, "lock": True} payload = {"fieldName": field_name, "lock": True}
resp = client.post( resp = client.post(
@@ -142,7 +142,7 @@ class TestDeviceFieldLock:
"""Lock a field then unlock it.""" """Lock a field then unlock it."""
# Create device # Create device
self.test_create_test_device(client, test_mac, auth_headers) self.test_create_test_device(client, test_mac, auth_headers)
# Lock field # Lock field
lock_payload = {"fieldName": "devName", "lock": True} lock_payload = {"fieldName": "devName", "lock": True}
resp = client.post( resp = client.post(
@@ -152,13 +152,13 @@ class TestDeviceFieldLock:
) )
assert resp.status_code == 200 assert resp.status_code == 200
assert resp.json.get("locked") is True assert resp.json.get("locked") is True
# Verify source is LOCKED # Verify source is LOCKED
resp = client.get(f"/device/{test_mac}", headers=auth_headers) resp = client.get(f"/device/{test_mac}", headers=auth_headers)
assert resp.status_code == 200 assert resp.status_code == 200
device_data = resp.json device_data = resp.json
assert device_data.get("devNameSource") == "LOCKED" assert device_data.get("devNameSource") == "LOCKED"
# Unlock field # Unlock field
unlock_payload = {"fieldName": "devName", "lock": False} unlock_payload = {"fieldName": "devName", "lock": False}
resp = client.post( resp = client.post(
@@ -168,7 +168,7 @@ class TestDeviceFieldLock:
) )
assert resp.status_code == 200 assert resp.status_code == 200
assert resp.json.get("locked") is False assert resp.json.get("locked") is False
# Verify source changed # Verify source changed
resp = client.get(f"/device/{test_mac}", headers=auth_headers) resp = client.get(f"/device/{test_mac}", headers=auth_headers)
assert resp.status_code == 200 assert resp.status_code == 200
@@ -179,7 +179,7 @@ class TestDeviceFieldLock:
"""Locked field should not be updated through API.""" """Locked field should not be updated through API."""
# Create device with initial name # Create device with initial name
self.test_create_test_device(client, test_mac, auth_headers) self.test_create_test_device(client, test_mac, auth_headers)
# Lock the field # Lock the field
lock_payload = {"fieldName": "devName", "lock": True} lock_payload = {"fieldName": "devName", "lock": True}
resp = client.post( resp = client.post(
@@ -188,7 +188,7 @@ class TestDeviceFieldLock:
headers=auth_headers headers=auth_headers
) )
assert resp.status_code == 200 assert resp.status_code == 200
# Try to update the locked field # Try to update the locked field
update_payload = {"devName": "New Name"} update_payload = {"devName": "New Name"}
resp = client.post( resp = client.post(
@@ -196,7 +196,7 @@ class TestDeviceFieldLock:
json=update_payload, json=update_payload,
headers=auth_headers headers=auth_headers
) )
# Update should succeed at API level but authoritative handler should prevent it # Update should succeed at API level but authoritative handler should prevent it
# The field update logic checks source in the database layer # The field update logic checks source in the database layer
# For now verify the API accepts the request # For now verify the API accepts the request
@@ -206,7 +206,7 @@ class TestDeviceFieldLock:
"""Lock some fields while leaving others unlocked.""" """Lock some fields while leaving others unlocked."""
# Create device # Create device
self.test_create_test_device(client, test_mac, auth_headers) self.test_create_test_device(client, test_mac, auth_headers)
# Lock only devName and devVendor # Lock only devName and devVendor
for field in ["devName", "devVendor"]: for field in ["devName", "devVendor"]:
payload = {"fieldName": field, "lock": True} payload = {"fieldName": field, "lock": True}
@@ -216,16 +216,16 @@ class TestDeviceFieldLock:
headers=auth_headers headers=auth_headers
) )
assert resp.status_code == 200 assert resp.status_code == 200
# Verify device state # Verify device state
resp = client.get(f"/device/{test_mac}", headers=auth_headers) resp = client.get(f"/device/{test_mac}", headers=auth_headers)
assert resp.status_code == 200 assert resp.status_code == 200
device_data = resp.json device_data = resp.json
# Locked fields should have LOCKED source # Locked fields should have LOCKED source
assert device_data.get("devNameSource") == "LOCKED" assert device_data.get("devNameSource") == "LOCKED"
assert device_data.get("devVendorSource") == "LOCKED" assert device_data.get("devVendorSource") == "LOCKED"
# Other fields should not be locked # Other fields should not be locked
assert device_data.get("devLastIPSource") != "LOCKED" assert device_data.get("devLastIPSource") != "LOCKED"
assert device_data.get("devFQDNSource") != "LOCKED" assert device_data.get("devFQDNSource") != "LOCKED"
@@ -234,9 +234,9 @@ class TestDeviceFieldLock:
"""Locking the same field multiple times should work.""" """Locking the same field multiple times should work."""
# Create device # Create device
self.test_create_test_device(client, test_mac, auth_headers) self.test_create_test_device(client, test_mac, auth_headers)
payload = {"fieldName": "devName", "lock": True} payload = {"fieldName": "devName", "lock": True}
# Lock once # Lock once
resp1 = client.post( resp1 = client.post(
f"/device/{test_mac}/field/lock", f"/device/{test_mac}/field/lock",
@@ -244,7 +244,7 @@ class TestDeviceFieldLock:
headers=auth_headers headers=auth_headers
) )
assert resp1.status_code == 200 assert resp1.status_code == 200
# Lock again # Lock again
resp2 = client.post( resp2 = client.post(
f"/device/{test_mac}/field/lock", f"/device/{test_mac}/field/lock",
@@ -269,10 +269,38 @@ class TestDeviceFieldLock:
class TestFieldLockIntegration: class TestFieldLockIntegration:
"""Integration tests for field locking with plugin overwrites.""" """Integration tests for field locking with plugin overwrites."""
def test_lock_unlock_normalizes_mac(self, test_mac):
"""Lock/unlock should normalize MAC addresses before DB updates."""
device_handler = DeviceInstance()
create_result = device_handler.setDeviceData(
test_mac,
{
"devName": "Original Name",
"devLastIP": "192.168.1.100",
"createNew": True,
},
)
assert create_result.get("success") is True
mac_variant = "aa-bb-cc-dd-ee-ff"
lock_result = device_handler.lockDeviceField(mac_variant, "devName")
assert lock_result.get("success") is True
device_data = device_handler.getDeviceData(test_mac)
assert device_data.get("devNameSource") == "LOCKED"
unlock_result = device_handler.unlockDeviceField(mac_variant, "devName")
assert unlock_result.get("success") is True
device_data = device_handler.getDeviceData(test_mac)
assert device_data.get("devNameSource") != "LOCKED"
def test_locked_field_blocks_plugin_overwrite(self, test_mac, auth_headers): def test_locked_field_blocks_plugin_overwrite(self, test_mac, auth_headers):
"""Verify locked fields prevent plugin source overwrites.""" """Verify locked fields prevent plugin source overwrites."""
device_handler = DeviceInstance() device_handler = DeviceInstance()
# Create device # Create device
create_result = device_handler.setDeviceData(test_mac, { create_result = device_handler.setDeviceData(test_mac, {
"devName": "Original Name", "devName": "Original Name",
@@ -280,10 +308,10 @@ class TestFieldLockIntegration:
"createNew": True "createNew": True
}) })
assert create_result.get("success") is True assert create_result.get("success") is True
# Lock the field # Lock the field
device_handler.updateDeviceColumn(test_mac, "devNameSource", "LOCKED") device_handler.updateDeviceColumn(test_mac, "devNameSource", "LOCKED")
# Try to overwrite with plugin source (this would be done by authoritative handler) # Try to overwrite with plugin source (this would be done by authoritative handler)
# For now, verify the source is stored correctly # For now, verify the source is stored correctly
device_data = device_handler.getDeviceData(test_mac) device_data = device_handler.getDeviceData(test_mac)
@@ -292,7 +320,7 @@ class TestFieldLockIntegration:
def test_field_source_tracking(self, test_mac, auth_headers): def test_field_source_tracking(self, test_mac, auth_headers):
"""Verify field source is tracked correctly.""" """Verify field source is tracked correctly."""
device_handler = DeviceInstance() device_handler = DeviceInstance()
# Create device # Create device
create_result = device_handler.setDeviceData(test_mac, { create_result = device_handler.setDeviceData(test_mac, {
"devName": "Test Device", "devName": "Test Device",
@@ -300,17 +328,17 @@ class TestFieldLockIntegration:
"createNew": True "createNew": True
}) })
assert create_result.get("success") is True assert create_result.get("success") is True
# Verify initial source # Verify initial source
device_data = device_handler.getDeviceData(test_mac) device_data = device_handler.getDeviceData(test_mac)
assert device_data.get("devNameSource") == "NEWDEV" assert device_data.get("devNameSource") == "NEWDEV"
# Update field (should set source to USER) # Update field (should set source to USER)
update_result = device_handler.setDeviceData(test_mac, { update_result = device_handler.setDeviceData(test_mac, {
"devName": "Updated Name" "devName": "Updated Name"
}) })
assert update_result.get("success") is True assert update_result.get("success") is True
# Verify source changed to USER # Verify source changed to USER
device_data = device_handler.getDeviceData(test_mac) device_data = device_handler.getDeviceData(test_mac)
assert device_data.get("devNameSource") == "USER" assert device_data.get("devNameSource") == "USER"