feat: Enhance authoritative field handling with new locking mechanisms and update tests

This commit is contained in:
Jokob @NetAlertX
2026-01-21 04:46:07 +00:00
parent 97e684dba4
commit 54d01f0a65
5 changed files with 309 additions and 79 deletions

View File

@@ -17,6 +17,7 @@ sys.path.extend([f"{INSTALL_PATH}/server"])
from logger import mylog # noqa: E402 [flake8 lint suppression]
from helper import get_setting_value # noqa: E402 [flake8 lint suppression]
from db.db_helper import row_to_json # noqa: E402 [flake8 lint suppression]
# Map of field to its source tracking field
@@ -149,8 +150,6 @@ def enforce_source_on_user_update(devMac, updates_dict, conn):
conn: Database connection object.
"""
cur = conn.cursor()
# Check if field has a corresponding source and should be updated
cur = conn.cursor()
try:
@@ -160,10 +159,9 @@ def enforce_source_on_user_update(devMac, updates_dict, conn):
device_columns = set()
updates_to_apply = {}
for field_name, new_value in updates_dict.items():
for field_name in updates_dict.keys():
if field_name in FIELD_SOURCE_MAP:
source_field = FIELD_SOURCE_MAP[field_name]
# User is updating this field, so mark it as USER
if not device_columns or source_field in device_columns:
updates_to_apply[source_field] = "USER"
@@ -179,17 +177,62 @@ def enforce_source_on_user_update(devMac, updates_dict, conn):
try:
cur.execute(sql, values)
conn.commit()
mylog(
"debug",
[f"[enforce_source_on_user_update] Updated sources for {devMac}: {updates_to_apply}"],
)
except Exception as e:
mylog("none", [f"[enforce_source_on_user_update] ERROR: {e}"])
conn.rollback()
raise
def get_locked_field_overrides(devMac, updates_dict, conn):
"""
For user updates, restore values for any fields whose *Source is LOCKED.
Args:
devMac: The MAC address of the device being updated.
updates_dict: Dict of field -> value being updated.
conn: Database connection object.
Returns:
tuple(set, dict): (locked_fields, overrides)
locked_fields: set of field names that are locked
overrides: dict of field -> existing value to preserve
"""
tracked_fields = [field for field in updates_dict.keys() if field in FIELD_SOURCE_MAP]
if not tracked_fields:
return set(), {}
select_columns = tracked_fields + [FIELD_SOURCE_MAP[field] for field in tracked_fields]
select_clause = ", ".join(select_columns)
cur = conn.cursor()
try:
cur.execute(
f"SELECT {select_clause} FROM Devices WHERE devMac=?",
(devMac,),
)
row = cur.fetchone()
except Exception:
row = None
if not row:
return set(), {}
row_data = row_to_json(list(row.keys()), row)
locked_fields = set()
overrides = {}
for field in tracked_fields:
source_field = FIELD_SOURCE_MAP[field]
if row_data.get(source_field) == "LOCKED":
locked_fields.add(field)
overrides[field] = row_data.get(field) or ""
return locked_fields, overrides
def lock_field(devMac, field_name, conn):
"""
Lock a field so it won't be overwritten by plugins.