feat: authoritative plugin fields

Signed-off-by: jokob-sk <jokob.sk@gmail.com>
This commit is contained in:
jokob-sk
2026-01-25 11:40:29 +11:00
parent 27f7bfd129
commit 96e4909bf0
8 changed files with 364 additions and 39 deletions

View File

@@ -72,7 +72,7 @@ from .openapi.schemas import ( # noqa: E402 [flake8 lint suppression]
BaseResponse, DeviceTotalsResponse,
DeleteDevicesRequest, DeviceImportRequest,
DeviceImportResponse, UpdateDeviceColumnRequest,
LockDeviceFieldRequest,
LockDeviceFieldRequest, UnlockDeviceFieldsRequest,
CopyDeviceRequest, TriggerScanRequest,
OpenPortsRequest,
OpenPortsResponse, WakeOnLanRequest,
@@ -445,6 +445,10 @@ def api_device_update_column(mac, payload=None):
return jsonify(result)
# --------------------------
# Field sources and locking
# --------------------------
@app.route("/device/<mac>/field/lock", methods=["POST"])
@validate_request(
operation_id="lock_device_field",
@@ -496,6 +500,44 @@ def api_device_field_lock(mac, payload=None):
return jsonify({"success": False, "error": str(e)}), 500
@app.route("/devices/fields/unlock", methods=["POST"])
@validate_request(
operation_id="unlock_device_fields",
summary="Unlock/Clear Device Fields",
description=(
"Unlock device fields (clear LOCKED/USER sources) or clear all sources. "
"Can target one device or all devices, and one or multiple fields."
),
request_model=UnlockDeviceFieldsRequest,
response_model=BaseResponse,
tags=["devices"],
auth_callable=is_authorized
)
def api_device_fields_unlock(payload=None):
"""
Unlock or clear fields for one device or all devices.
"""
data = request.get_json() or {}
mac = data.get("mac")
fields = data.get("fields")
if fields and not isinstance(fields, list):
return jsonify({
"success": False,
"error": "fields must be a list of field names"
}), 400
clear_all = bool(data.get("clearAll", False))
device_handler = DeviceInstance()
# Call wrapper directly — it handles validation and normalization
result = device_handler.unlockFields(mac=mac, fields=fields, clear_all=clear_all)
return jsonify(result)
# --------------------------
# Devices Collections
# --------------------------
@app.route('/mcp/sse/device/<mac>/set-alias', methods=['POST'])
@app.route('/device/<mac>/set-alias', methods=['POST'])
@validate_request(
@@ -553,9 +595,6 @@ def api_device_open_ports(payload=None):
return jsonify({"success": True, "target": target, "open_ports": open_ports})
# --------------------------
# Devices Collections
# --------------------------
@app.route("/devices", methods=["GET"])
@validate_request(
operation_id="get_all_devices",

View File

@@ -15,7 +15,7 @@ from __future__ import annotations
import re
import ipaddress
from typing import Optional, List, Literal, Any, Dict
from typing import Optional, List, Literal, Any, Dict, Union
from pydantic import BaseModel, Field, field_validator, model_validator, ConfigDict, RootModel
# Internal helper imports
@@ -279,6 +279,22 @@ class LockDeviceFieldRequest(BaseModel):
lock: bool = Field(True, description="True to lock the field, False to unlock")
class UnlockDeviceFieldsRequest(BaseModel):
"""Request to unlock/clear device fields for one or multiple devices."""
mac: Optional[Union[str, List[str]]] = Field(
None,
description="Single MAC, list of MACs, or None to target all devices"
)
fields: Optional[List[str]] = Field(
None,
description="List of field names to unlock. If omitted, all tracked fields will be unlocked"
)
clear_all: bool = Field(
False,
description="True to clear all sources, False to clear only LOCKED/USER"
)
class DeviceUpdateRequest(BaseModel):
"""Request to update device fields (create/update)."""
model_config = ConfigDict(extra="allow")

View File

@@ -18,6 +18,7 @@ sys.path.extend([f"{INSTALL_PATH}/server"])
from logger import mylog # noqa: E402 [flake8 lint suppression]
from helper import get_setting_value # noqa: E402 [flake8 lint suppression]
from db.db_helper import row_to_json # noqa: E402 [flake8 lint suppression]
from plugin_helper import normalize_mac # noqa: E402 [flake8 lint suppression]
# Map of field to its source tracking field
@@ -287,27 +288,28 @@ def lock_field(devMac, field_name, conn):
"""
Lock a field so it won't be overwritten by plugins.
Args:
devMac: The MAC address of the device.
field_name: The field to lock.
conn: Database connection object.
Returns:
dict: {"success": bool, "error": str|None}
"""
if field_name not in FIELD_SOURCE_MAP:
mylog("debug", [f"[lock_field] Field {field_name} does not support locking"])
return
msg = f"Field {field_name} does not support locking"
mylog("debug", [f"[lock_field] {msg}"])
return {"success": False, "error": msg}
source_field = FIELD_SOURCE_MAP[field_name]
cur = conn.cursor()
try:
cur.execute("PRAGMA table_info(Devices)")
device_columns = {row["name"] for row in cur.fetchall()}
except Exception:
except Exception as e:
device_columns = set()
mylog("none", [f"[lock_field] Failed to get table info: {e}"])
if device_columns and source_field not in device_columns:
mylog("debug", [f"[lock_field] Source column {source_field} missing for {field_name}"])
return
msg = f"Source column {source_field} missing for {field_name}"
mylog("debug", [f"[lock_field] {msg}"])
return {"success": False, "error": msg}
sql = f"UPDATE Devices SET {source_field}='LOCKED' WHERE devMac = ?"
@@ -315,46 +317,128 @@ def lock_field(devMac, field_name, conn):
cur.execute(sql, (devMac,))
conn.commit()
mylog("debug", [f"[lock_field] Locked {field_name} for {devMac}"])
return {"success": True, "error": None}
except Exception as e:
mylog("none", [f"[lock_field] ERROR: {e}"])
conn.rollback()
raise
try:
conn.rollback()
except Exception:
pass
return {"success": False, "error": str(e)}
def unlock_field(devMac, field_name, conn):
"""
Unlock a field so plugins can overwrite it again.
Args:
devMac: The MAC address of the device.
field_name: The field to unlock.
conn: Database connection object.
Returns:
dict: {"success": bool, "error": str|None}
"""
if field_name not in FIELD_SOURCE_MAP:
mylog("debug", [f"[unlock_field] Field {field_name} does not support unlocking"])
return
msg = f"Field {field_name} does not support unlocking"
mylog("debug", [f"[unlock_field] {msg}"])
return {"success": False, "error": msg}
source_field = FIELD_SOURCE_MAP[field_name]
cur = conn.cursor()
try:
cur.execute("PRAGMA table_info(Devices)")
device_columns = {row["name"] for row in cur.fetchall()}
except Exception:
except Exception as e:
device_columns = set()
mylog("none", [f"[unlock_field] Failed to get table info: {e}"])
if device_columns and source_field not in device_columns:
mylog("debug", [f"[unlock_field] Source column {source_field} missing for {field_name}"])
return
msg = f"Source column {source_field} missing for {field_name}"
mylog("debug", [f"[unlock_field] {msg}"])
return {"success": False, "error": msg}
# Unlock by resetting to empty (allows overwrite)
sql = f"UPDATE Devices SET {source_field}='' WHERE devMac = ?"
try:
cur.execute(sql, (devMac,))
conn.commit()
mylog("debug", [f"[unlock_field] Unlocked {field_name} for {devMac}"])
return {"success": True, "error": None}
except Exception as e:
mylog("none", [f"[unlock_field] ERROR: {e}"])
conn.rollback()
raise
try:
conn.rollback()
except Exception:
pass
return {"success": False, "error": str(e)}
def unlock_fields(conn, mac=None, fields=None, clear_all=False):
"""
Unlock or clear source fields for one device, multiple devices, or all devices.
Args:
conn: Database connection object.
mac: Device MAC address (string) or list of MACs. If None, operate on all devices.
fields: Optional list of fields to unlock. If None, use all tracked fields.
clear_all: If True, clear all values in source fields; otherwise, only clear LOCKED/USER.
Returns:
dict: {
"success": bool,
"error": str|None,
"devicesAffected": int,
"fieldsAffected": list
}
"""
target_fields = fields if fields else list(FIELD_SOURCE_MAP.keys())
if not target_fields:
return {"success": False, "error": "No fields to process", "devicesAffected": 0, "fieldsAffected": []}
try:
cur = conn.cursor()
fields_set_clauses = []
for field in target_fields:
source_field = FIELD_SOURCE_MAP[field]
if clear_all:
fields_set_clauses.append(f"{source_field}=''")
else:
fields_set_clauses.append(
f"{source_field}=CASE WHEN {source_field} IN ('LOCKED','USER') THEN '' ELSE {source_field} END"
)
set_clause = ", ".join(fields_set_clauses)
if mac:
# mac can be a single string or a list
macs = mac if isinstance(mac, list) else [mac]
normalized_macs = [normalize_mac(m) for m in macs]
placeholders = ",".join("?" for _ in normalized_macs)
sql = f"UPDATE Devices SET {set_clause} WHERE devMac IN ({placeholders})"
cur.execute(sql, normalized_macs)
else:
# All devices
sql = f"UPDATE Devices SET {set_clause}"
cur.execute(sql)
conn.commit()
return {
"success": True,
"error": None,
"devicesAffected": cur.rowcount,
"fieldsAffected": target_fields,
}
except Exception as e:
try:
conn.rollback()
except Exception:
pass
return {
"success": False,
"error": str(e),
"devicesAffected": 0,
"fieldsAffected": [],
}
finally:
conn.close()

View File

@@ -15,6 +15,7 @@ from db.authoritative_handler import (
lock_field,
unlock_field,
FIELD_SOURCE_MAP,
unlock_fields
)
from helper import is_random_mac, get_setting_value
from utils.datetime_utils import timeNowDB, format_date
@@ -804,10 +805,12 @@ class DeviceInstance:
mac_normalized = normalize_mac(mac)
conn = get_temp_db_connection()
try:
lock_field(mac_normalized, field_name, conn)
return {"success": True, "message": f"Field {field_name} locked"}
result = lock_field(mac_normalized, field_name, conn)
# Include field name in response
result["fieldName"] = field_name
return result
except Exception as e:
return {"success": False, "error": str(e)}
return {"success": False, "error": str(e), "fieldName": field_name}
finally:
conn.close()
@@ -819,13 +822,55 @@ class DeviceInstance:
mac_normalized = normalize_mac(mac)
conn = get_temp_db_connection()
try:
unlock_field(mac_normalized, field_name, conn)
return {"success": True, "message": f"Field {field_name} unlocked"}
result = unlock_field(mac_normalized, field_name, conn)
# Include field name in response
result["fieldName"] = field_name
return result
except Exception as e:
return {"success": False, "error": str(e)}
return {"success": False, "error": str(e), "fieldName": field_name}
finally:
conn.close()
def unlockFields(self, mac=None, fields=None, clear_all=False):
"""
Wrapper to unlock one field, multiple fields, or all fields of a device or all devices.
Args:
mac: Optional MAC address of a single device (string) or multiple devices (list of strings).
If None, the operation applies to all devices.
fields: Optional list of field names to unlock. If None, all tracked fields are unlocked.
clear_all: If True, clear all values in the corresponding source fields.
If False, only clear fields whose source is 'LOCKED' or 'USER'.
Returns:
dict: {
"success": bool,
"error": str|None,
"devicesAffected": int,
"fieldsAffected": list
}
"""
# If no fields specified, unlock all tracked fields
if fields is None:
fields_to_unlock = list(FIELD_SOURCE_MAP.keys())
else:
# Validate fields
invalid_fields = [f for f in fields if f not in FIELD_SOURCE_MAP]
if invalid_fields:
return {
"success": False,
"error": f"Invalid fields: {', '.join(invalid_fields)}",
"devicesAffected": 0,
"fieldsAffected": []
}
fields_to_unlock = fields
conn = get_temp_db_connection()
result = unlock_fields(conn, mac=mac, fields=fields_to_unlock, clear_all=clear_all)
conn.close()
return result
def copyDevice(self, mac_from, mac_to):
"""Copy a device entry from one MAC to another."""
conn = get_temp_db_connection()