PLG: Implement selective recording for Plugins_History to prevent unbounded growth
Some checks failed
🐳 ⚠ docker-unsafe from next_release branch / docker_dev_unsafe (push) Has been cancelled

This commit is contained in:
Jokob @NetAlertX
2026-04-12 23:09:34 +00:00
parent b0c687a171
commit 8abecb7a0d
3 changed files with 424 additions and 2 deletions

View File

@@ -786,6 +786,10 @@ def process_plugin_events(db, plugin, plugEventsArr):
pluginEvents[index].status = "watched-not-changed"
index += 1
# Track objects whose state actually changed this cycle
# (only these will be recorded in Plugins_History)
changed_this_cycle = set()
# Loop thru events and check if previously available objects are missing
for tmpObj in pluginObjects:
isMissing = True
@@ -799,6 +803,7 @@ def process_plugin_events(db, plugin, plugEventsArr):
if tmpObj.status != "missing-in-last-scan":
tmpObj.changed = timeNowUTC()
tmpObj.status = "missing-in-last-scan"
changed_this_cycle.add(tmpObj.idsHash)
# mylog('debug', [f'[Plugins] Missing from last scan (PrimaryID | SecondaryID): {tmpObj.primaryId} | {tmpObj.secondaryId}'])
# Merge existing plugin objects with newly discovered ones and update existing ones with new values
@@ -807,10 +812,14 @@ def process_plugin_events(db, plugin, plugEventsArr):
if tmpObjFromEvent.status == "not-processed":
# This is a new object as it was not discovered as "exists" previously
tmpObjFromEvent.status = "new"
changed_this_cycle.add(tmpObjFromEvent.idsHash)
pluginObjects.append(tmpObjFromEvent)
# update data of existing objects
else:
if tmpObjFromEvent.status == "watched-changed":
changed_this_cycle.add(tmpObjFromEvent.idsHash)
index = 0
for plugObj in pluginObjects:
# find corresponding object for the event and merge
@@ -871,8 +880,9 @@ def process_plugin_events(db, plugin, plugEventsArr):
if plugObj.status in statuses_to_report_on:
events_to_insert.append(values)
# combine all DB insert and update events into one for history
history_to_insert.append(values)
# Only record history for objects that actually changed this cycle
if plugObj.idsHash in changed_this_cycle:
history_to_insert.append(values)
mylog("debug", f"[Plugins] pluginEvents count: {len(pluginEvents)}")
mylog("debug", f"[Plugins] pluginObjects count: {len(pluginObjects)}")

View File

@@ -6,6 +6,7 @@ Import from any test subdirectory with:
import sys, os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from db_test_helpers import make_db, insert_device, minutes_ago, DummyDB, down_event_macs, make_device_dict, sync_insert_devices
from db_test_helpers import make_plugin_db, make_plugin_dict, make_plugin_event_row, seed_plugin_object, plugin_history_rows, plugin_objects_rows, PluginFakeDB
"""
import sqlite3
@@ -351,3 +352,201 @@ class DummyDB:
def commitDB(self) -> None:
self._conn.commit()
# ---------------------------------------------------------------------------
# Plugin tables DDL & helpers (used by test/server/test_plugin_history_filtering.py)
# ---------------------------------------------------------------------------
CREATE_PLUGINS_OBJECTS = """
CREATE TABLE IF NOT EXISTS Plugins_Objects(
"index" INTEGER PRIMARY KEY AUTOINCREMENT,
plugin TEXT NOT NULL,
objectPrimaryId TEXT NOT NULL,
objectSecondaryId TEXT NOT NULL,
dateTimeCreated TEXT NOT NULL,
dateTimeChanged TEXT NOT NULL,
watchedValue1 TEXT NOT NULL,
watchedValue2 TEXT NOT NULL,
watchedValue3 TEXT NOT NULL,
watchedValue4 TEXT NOT NULL,
"status" TEXT NOT NULL,
extra TEXT NOT NULL,
userData TEXT NOT NULL,
foreignKey TEXT NOT NULL,
syncHubNodeName TEXT,
helpVal1 TEXT,
helpVal2 TEXT,
helpVal3 TEXT,
helpVal4 TEXT,
objectGuid TEXT
);
"""
CREATE_PLUGINS_EVENTS = """
CREATE TABLE IF NOT EXISTS Plugins_Events(
"index" INTEGER PRIMARY KEY AUTOINCREMENT,
plugin TEXT NOT NULL,
objectPrimaryId TEXT NOT NULL,
objectSecondaryId TEXT NOT NULL,
dateTimeCreated TEXT NOT NULL,
dateTimeChanged TEXT NOT NULL,
watchedValue1 TEXT NOT NULL,
watchedValue2 TEXT NOT NULL,
watchedValue3 TEXT NOT NULL,
watchedValue4 TEXT NOT NULL,
"status" TEXT NOT NULL,
extra TEXT NOT NULL,
userData TEXT NOT NULL,
foreignKey TEXT NOT NULL,
syncHubNodeName TEXT,
helpVal1 TEXT,
helpVal2 TEXT,
helpVal3 TEXT,
helpVal4 TEXT,
objectGuid TEXT
);
"""
CREATE_PLUGINS_HISTORY = """
CREATE TABLE IF NOT EXISTS Plugins_History(
"index" INTEGER PRIMARY KEY AUTOINCREMENT,
plugin TEXT NOT NULL,
objectPrimaryId TEXT NOT NULL,
objectSecondaryId TEXT NOT NULL,
dateTimeCreated TEXT NOT NULL,
dateTimeChanged TEXT NOT NULL,
watchedValue1 TEXT NOT NULL,
watchedValue2 TEXT NOT NULL,
watchedValue3 TEXT NOT NULL,
watchedValue4 TEXT NOT NULL,
"status" TEXT NOT NULL,
extra TEXT NOT NULL,
userData TEXT NOT NULL,
foreignKey TEXT NOT NULL,
syncHubNodeName TEXT,
helpVal1 TEXT,
helpVal2 TEXT,
helpVal3 TEXT,
helpVal4 TEXT,
objectGuid TEXT
);
"""
class PluginFakeSQL:
"""Wraps a sqlite3.Cursor to provide the interface plugin.py expects."""
def __init__(self, cursor):
self._cursor = cursor
def execute(self, sql, params=None):
if params:
return self._cursor.execute(sql, params)
return self._cursor.execute(sql)
def executemany(self, sql, params_list):
return self._cursor.executemany(sql, params_list)
class PluginFakeDB:
"""Minimal DB facade expected by process_plugin_events."""
def __init__(self, conn):
self.sql_connection = conn
self.sql = PluginFakeSQL(conn.cursor())
def get_sql_array(self, query):
cur = self.sql_connection.cursor()
cur.execute(query)
return cur.fetchall()
def commitDB(self):
self.sql_connection.commit()
def make_plugin_db() -> tuple:
"""
Return a (PluginFakeDB, connection) backed by an in-memory SQLite
database with all three plugin tables created.
"""
conn = sqlite3.connect(":memory:")
conn.executescript(
CREATE_PLUGINS_OBJECTS + CREATE_PLUGINS_EVENTS + CREATE_PLUGINS_HISTORY
)
conn.commit()
db = PluginFakeDB(conn)
return db, conn
def make_plugin_dict(prefix: str, watched_columns=None) -> dict:
"""Return a minimal plugin dict compatible with process_plugin_events."""
return {
"unique_prefix": prefix,
"settings": [
{
"function": "WATCH",
"value": watched_columns or ["watchedValue1"],
},
],
}
def make_plugin_event_row(prefix: str, primary_id: str, secondary_id="sec",
watched1="val1", watched2="", watched3="",
watched4="", changed="2026-01-01 00:00:00",
extra="", user_data="", foreign_key="",
status="not-processed"):
"""Build a tuple mimicking a raw plugin output row (19 columns + index)."""
return (
0, # index (placeholder, not used for events)
prefix, # plugin
primary_id,
secondary_id,
changed, # dateTimeCreated
changed, # dateTimeChanged
watched1,
watched2,
watched3,
watched4,
status,
extra,
user_data,
foreign_key,
None, # syncHubNodeName
None, # helpVal1
None, # helpVal2
None, # helpVal3
None, # helpVal4
)
def seed_plugin_object(cur, prefix: str, primary_id: str,
secondary_id="sec", watched1="val1",
status="watched-not-changed",
changed="2026-01-01 00:00:00"):
"""Insert a row into Plugins_Objects to simulate a pre-existing object."""
cur.execute(
"""INSERT INTO Plugins_Objects
(plugin, objectPrimaryId, objectSecondaryId, dateTimeCreated,
dateTimeChanged, watchedValue1, watchedValue2, watchedValue3,
watchedValue4, status, extra, userData, foreignKey)
VALUES (?, ?, ?, ?, ?, ?, '', '', '', ?, '', '', '')""",
(prefix, primary_id, secondary_id, changed, changed, watched1, status),
)
def plugin_history_rows(conn, prefix: str):
"""Return all Plugins_History rows for a given plugin prefix."""
cur = conn.cursor()
cur.execute(
"SELECT * FROM Plugins_History WHERE plugin = ?", (prefix,)
)
return cur.fetchall()
def plugin_objects_rows(conn, prefix: str):
"""Return all Plugins_Objects rows for a given plugin prefix."""
cur = conn.cursor()
cur.execute(
"SELECT * FROM Plugins_Objects WHERE plugin = ?", (prefix,)
)
return cur.fetchall()

View File

@@ -0,0 +1,213 @@
"""
Tests for the Plugins_History selective recording introduced to prevent
unbounded table growth.
Verifies that process_plugin_events() only writes history rows for objects
whose state actually changed in the current cycle:
- new objects
- watched-changed objects
- missing-in-last-scan (first transition only)
Objects that are watched-not-changed or already missing should NOT generate
history entries.
"""
import sys
import os
import pytest
# ---------------------------------------------------------------------------
# Path setup
# ---------------------------------------------------------------------------
INSTALL_PATH = os.getenv("NETALERTX_APP", "/app")
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
# Test helpers (shared DDL, fake DB, factories)
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from db_test_helpers import ( # noqa: E402
make_plugin_db,
make_plugin_dict,
make_plugin_event_row,
seed_plugin_object,
plugin_history_rows,
plugin_objects_rows,
)
from plugin import process_plugin_events # noqa: E402
PREFIX = "TESTPLG"
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def plugin_db():
"""Yield a (PluginFakeDB, connection) backed by an in-memory SQLite database."""
db, conn = make_plugin_db()
yield db, conn
conn.close()
def _no_report_on(key):
"""Monkeypatch target: return empty REPORT_ON so no events are generated."""
return [] if key.endswith("_REPORT_ON") else ""
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
class TestHistoryOnlyRecordsChanges:
"""Core assertion: unchanged objects must NOT appear in Plugins_History."""
def test_new_object_recorded_in_history(self, plugin_db, monkeypatch):
"""A brand-new object should produce exactly one history row."""
db, conn = plugin_db
monkeypatch.setattr("plugin.get_setting_value", _no_report_on)
plugin = make_plugin_dict(PREFIX)
events = [make_plugin_event_row(PREFIX, "device_A")]
process_plugin_events(db, plugin, events)
rows = plugin_history_rows(conn, PREFIX)
assert len(rows) == 1
assert rows[0][2] == "device_A" # objectPrimaryId
def test_unchanged_object_not_recorded(self, plugin_db, monkeypatch):
"""An object with watched-not-changed should NOT appear in history."""
db, conn = plugin_db
monkeypatch.setattr("plugin.get_setting_value", _no_report_on)
cur = conn.cursor()
seed_plugin_object(cur, PREFIX, "device_A", watched1="val1",
status="watched-not-changed")
conn.commit()
plugin = make_plugin_dict(PREFIX)
events = [make_plugin_event_row(PREFIX, "device_A", watched1="val1")]
process_plugin_events(db, plugin, events)
rows = plugin_history_rows(conn, PREFIX)
assert len(rows) == 0, (
"watched-not-changed objects should not generate history rows"
)
def test_watched_changed_recorded(self, plugin_db, monkeypatch):
"""An object whose watched column changed should appear in history."""
db, conn = plugin_db
monkeypatch.setattr("plugin.get_setting_value", _no_report_on)
cur = conn.cursor()
seed_plugin_object(cur, PREFIX, "device_A", watched1="old_value",
status="watched-not-changed")
conn.commit()
plugin = make_plugin_dict(PREFIX)
events = [make_plugin_event_row(PREFIX, "device_A", watched1="new_value")]
process_plugin_events(db, plugin, events)
rows = plugin_history_rows(conn, PREFIX)
assert len(rows) == 1
assert rows[0][2] == "device_A"
def test_missing_first_time_recorded(self, plugin_db, monkeypatch):
"""An object going missing for the first time should appear in history."""
db, conn = plugin_db
monkeypatch.setattr("plugin.get_setting_value", _no_report_on)
cur = conn.cursor()
seed_plugin_object(cur, PREFIX, "device_A", status="watched-not-changed")
conn.commit()
plugin = make_plugin_dict(PREFIX)
events = [] # No events reported — device_A is now missing
process_plugin_events(db, plugin, events)
rows = plugin_history_rows(conn, PREFIX)
assert len(rows) == 1
assert rows[0][2] == "device_A"
def test_already_missing_not_re_recorded(self, plugin_db, monkeypatch):
"""An object already marked missing-in-last-scan should NOT produce
another history row on subsequent runs."""
db, conn = plugin_db
monkeypatch.setattr("plugin.get_setting_value", _no_report_on)
cur = conn.cursor()
seed_plugin_object(cur, PREFIX, "device_A",
status="missing-in-last-scan")
conn.commit()
plugin = make_plugin_dict(PREFIX)
events = [] # still missing
process_plugin_events(db, plugin, events)
rows = plugin_history_rows(conn, PREFIX)
assert len(rows) == 0, (
"already-missing objects should not generate additional history rows"
)
def test_mixed_scenario(self, plugin_db, monkeypatch):
"""Simulate a realistic mixed run: new + unchanged + changed + missing."""
db, conn = plugin_db
monkeypatch.setattr("plugin.get_setting_value", _no_report_on)
cur = conn.cursor()
seed_plugin_object(cur, PREFIX, "unchanged", watched1="same",
status="watched-not-changed")
seed_plugin_object(cur, PREFIX, "will_change", watched1="old",
status="watched-not-changed")
seed_plugin_object(cur, PREFIX, "will_vanish",
status="watched-not-changed")
seed_plugin_object(cur, PREFIX, "already_gone",
status="missing-in-last-scan")
conn.commit()
plugin = make_plugin_dict(PREFIX)
events = [
make_plugin_event_row(PREFIX, "brand_new"), # new
make_plugin_event_row(PREFIX, "unchanged", watched1="same"), # no change
make_plugin_event_row(PREFIX, "will_change", watched1="new"), # changed
# will_vanish not reported → first-time missing
# already_gone not reported → still missing (no history)
]
process_plugin_events(db, plugin, events)
rows = plugin_history_rows(conn, PREFIX)
recorded_ids = {r[2] for r in rows} # objectPrimaryId
assert "brand_new" in recorded_ids, "new object should be in history"
assert "will_change" in recorded_ids, "changed object should be in history"
assert "will_vanish" in recorded_ids, "first-time missing should be in history"
assert "unchanged" not in recorded_ids, "unchanged should NOT be in history"
assert "already_gone" not in recorded_ids, "already-missing should NOT be in history"
assert len(rows) == 3
def test_objects_table_still_updated_for_unchanged(self, plugin_db, monkeypatch):
"""Even though history is skipped, Plugins_Objects must still be updated
for unchanged objects (no regression)."""
db, conn = plugin_db
monkeypatch.setattr("plugin.get_setting_value", _no_report_on)
cur = conn.cursor()
seed_plugin_object(cur, PREFIX, "device_A", watched1="val1",
status="watched-not-changed")
conn.commit()
plugin = make_plugin_dict(PREFIX)
events = [make_plugin_event_row(PREFIX, "device_A", watched1="val1")]
process_plugin_events(db, plugin, events)
objs = plugin_objects_rows(conn, PREFIX)
assert len(objs) == 1, "Plugins_Objects should still have the object"