BE: Events deduplication and uniqueness

Signed-off-by: jokob-sk <jokob.sk@gmail.com>
This commit is contained in:
jokob-sk
2026-02-15 13:59:15 +11:00
parent 46781ed71a
commit 9676111ceb
8 changed files with 62 additions and 46 deletions

View File

@@ -219,6 +219,13 @@ CREATE INDEX IDX_dev_Favorite ON Devices (devFavorite);
CREATE INDEX IDX_dev_LastIP ON Devices (devLastIP); CREATE INDEX IDX_dev_LastIP ON Devices (devLastIP);
CREATE INDEX IDX_dev_NewDevice ON Devices (devIsNew); CREATE INDEX IDX_dev_NewDevice ON Devices (devIsNew);
CREATE INDEX IDX_dev_Archived ON Devices (devIsArchived); CREATE INDEX IDX_dev_Archived ON Devices (devIsArchived);
CREATE UNIQUE INDEX IF NOT EXISTS idx_events_unique
ON Events (
eve_MAC,
eve_IP,
eve_EventType,
eve_DateTime
);
CREATE VIEW Events_Devices AS CREATE VIEW Events_Devices AS
SELECT * SELECT *
FROM Events FROM Events

View File

@@ -69,11 +69,9 @@ def cleanup_database(
mylog("verbose", [f"[{pluginName}] Upkeep Database: {dbPath}"]) mylog("verbose", [f"[{pluginName}] Upkeep Database: {dbPath}"])
# Connect to the App database
conn = get_temp_db_connection() conn = get_temp_db_connection()
cursor = conn.cursor() cursor = conn.cursor()
# Reindwex to prevent fails due to corruption
try: try:
cursor.execute("REINDEX;") cursor.execute("REINDEX;")
mylog("verbose", [f"[{pluginName}] REINDEX completed"]) mylog("verbose", [f"[{pluginName}] REINDEX completed"])
@@ -82,25 +80,25 @@ def cleanup_database(
# ----------------------------------------------------- # -----------------------------------------------------
# Cleanup Online History # Cleanup Online History
mylog("verbose", [f"[{pluginName}] Online_History: Delete all but keep latest 150 entries"],) mylog("verbose", [f"[{pluginName}] Online_History: Delete all but keep latest 150 entries"])
cursor.execute( cursor.execute(
"""DELETE from Online_History where "Index" not in ( """DELETE from Online_History where "Index" not in (
SELECT "Index" from Online_History SELECT "Index" from Online_History
order by Scan_Date desc limit 150)""" order by Scan_Date desc limit 150)"""
) )
mylog("verbose", [f"[{pluginName}] Online_History deleted rows: {cursor.rowcount}"])
# ----------------------------------------------------- # -----------------------------------------------------
# Cleanup Events # Cleanup Events
mylog("verbose", f"[{pluginName}] Events: Delete all older than {str(DAYS_TO_KEEP_EVENTS)} days (DAYS_TO_KEEP_EVENTS setting)") mylog("verbose", f"[{pluginName}] Events: Delete all older than {str(DAYS_TO_KEEP_EVENTS)} days (DAYS_TO_KEEP_EVENTS setting)")
sql = f"""DELETE FROM Events WHERE eve_DateTime <= date('now', '-{str(DAYS_TO_KEEP_EVENTS)} day')""" sql = f"""DELETE FROM Events WHERE eve_DateTime <= date('now', '-{str(DAYS_TO_KEEP_EVENTS)} day')"""
mylog("verbose", [f"[{pluginName}] SQL : {sql}"]) mylog("verbose", [f"[{pluginName}] SQL : {sql}"])
cursor.execute(sql) cursor.execute(sql)
# ----------------------------------------------------- mylog("verbose", [f"[{pluginName}] Events deleted rows: {cursor.rowcount}"])
# Trim Plugins_History entries to less than PLUGINS_KEEP_HIST setting per unique "Plugin" column entry
mylog("verbose", f"[{pluginName}] Plugins_History: Trim Plugins_History entries to less than {str(PLUGINS_KEEP_HIST)} per Plugin (PLUGINS_KEEP_HIST setting)")
# Build the SQL query to delete entries that exceed the limit per unique "Plugin" column entry # -----------------------------------------------------
# Plugins_History
mylog("verbose", f"[{pluginName}] Plugins_History: Trim to {str(PLUGINS_KEEP_HIST)} per Plugin")
delete_query = f"""DELETE FROM Plugins_History delete_query = f"""DELETE FROM Plugins_History
WHERE "Index" NOT IN ( WHERE "Index" NOT IN (
SELECT "Index" SELECT "Index"
@@ -111,17 +109,13 @@ def cleanup_database(
) AS ranked_objects ) AS ranked_objects
WHERE row_num <= {str(PLUGINS_KEEP_HIST)} WHERE row_num <= {str(PLUGINS_KEEP_HIST)}
);""" );"""
cursor.execute(delete_query) cursor.execute(delete_query)
mylog("verbose", [f"[{pluginName}] Plugins_History deleted rows: {cursor.rowcount}"])
# ----------------------------------------------------- # -----------------------------------------------------
# Trim Notifications entries to less than DBCLNP_NOTIFI_HIST setting # Notifications
histCount = get_setting_value("DBCLNP_NOTIFI_HIST") histCount = get_setting_value("DBCLNP_NOTIFI_HIST")
mylog("verbose", f"[{pluginName}] Notifications: Trim to {histCount}")
mylog("verbose", f"[{pluginName}] Plugins_History: Trim Notifications entries to less than {histCount}")
# Build the SQL query to delete entries
delete_query = f"""DELETE FROM Notifications delete_query = f"""DELETE FROM Notifications
WHERE "Index" NOT IN ( WHERE "Index" NOT IN (
SELECT "Index" SELECT "Index"
@@ -132,16 +126,13 @@ def cleanup_database(
) AS ranked_objects ) AS ranked_objects
WHERE row_num <= {histCount} WHERE row_num <= {histCount}
);""" );"""
cursor.execute(delete_query) cursor.execute(delete_query)
mylog("verbose", [f"[{pluginName}] Notifications deleted rows: {cursor.rowcount}"])
# ----------------------------------------------------- # -----------------------------------------------------
# Trim Workflow entries to less than WORKFLOWS_AppEvents_hist setting # AppEvents
histCount = get_setting_value("WORKFLOWS_AppEvents_hist") histCount = get_setting_value("WORKFLOWS_AppEvents_hist")
mylog("verbose", [f"[{pluginName}] Trim AppEvents to less than {histCount}"]) mylog("verbose", [f"[{pluginName}] Trim AppEvents to less than {histCount}"])
# Build the SQL query to delete entries
delete_query = f"""DELETE FROM AppEvents delete_query = f"""DELETE FROM AppEvents
WHERE "Index" NOT IN ( WHERE "Index" NOT IN (
SELECT "Index" SELECT "Index"
@@ -152,38 +143,40 @@ def cleanup_database(
) AS ranked_objects ) AS ranked_objects
WHERE row_num <= {histCount} WHERE row_num <= {histCount}
);""" );"""
cursor.execute(delete_query) cursor.execute(delete_query)
mylog("verbose", [f"[{pluginName}] AppEvents deleted rows: {cursor.rowcount}"])
conn.commit() conn.commit()
# ----------------------------------------------------- # -----------------------------------------------------
# Cleanup New Devices # Cleanup New Devices
if HRS_TO_KEEP_NEWDEV != 0: if HRS_TO_KEEP_NEWDEV != 0:
mylog("verbose", f"[{pluginName}] Devices: Delete all New Devices older than {str(HRS_TO_KEEP_NEWDEV)} hours (HRS_TO_KEEP_NEWDEV setting)") mylog("verbose", f"[{pluginName}] Devices: Delete New Devices older than {str(HRS_TO_KEEP_NEWDEV)} hours")
query = f"""DELETE FROM Devices WHERE devIsNew = 1 AND devFirstConnection < date('now', '-{str(HRS_TO_KEEP_NEWDEV)} hour')""" query = f"""DELETE FROM Devices WHERE devIsNew = 1 AND devFirstConnection < date('now', '-{str(HRS_TO_KEEP_NEWDEV)} hour')"""
mylog("verbose", [f"[{pluginName}] Query: {query} "]) mylog("verbose", [f"[{pluginName}] Query: {query}"])
cursor.execute(query) cursor.execute(query)
mylog("verbose", [f"[{pluginName}] Devices (new) deleted rows: {cursor.rowcount}"])
# ----------------------------------------------------- # -----------------------------------------------------
# Cleanup Offline Devices # Cleanup Offline Devices
if HRS_TO_KEEP_OFFDEV != 0: if HRS_TO_KEEP_OFFDEV != 0:
mylog("verbose", f"[{pluginName}] Devices: Delete all New Devices older than {str(HRS_TO_KEEP_OFFDEV)} hours (HRS_TO_KEEP_OFFDEV setting)") mylog("verbose", f"[{pluginName}] Devices: Delete Offline Devices older than {str(HRS_TO_KEEP_OFFDEV)} hours")
query = f"""DELETE FROM Devices WHERE devPresentLastScan = 0 AND devLastConnection < date('now', '-{str(HRS_TO_KEEP_OFFDEV)} hour')""" query = f"""DELETE FROM Devices WHERE devPresentLastScan = 0 AND devLastConnection < date('now', '-{str(HRS_TO_KEEP_OFFDEV)} hour')"""
mylog("verbose", [f"[{pluginName}] Query: {query} "]) mylog("verbose", [f"[{pluginName}] Query: {query}"])
cursor.execute(query) cursor.execute(query)
mylog("verbose", [f"[{pluginName}] Devices (offline) deleted rows: {cursor.rowcount}"])
# ----------------------------------------------------- # -----------------------------------------------------
# Clear New Flag # Clear New Flag
if CLEAR_NEW_FLAG != 0: if CLEAR_NEW_FLAG != 0:
mylog("verbose", f'[{pluginName}] Devices: Clear "New Device" flag for all devices older than {str(CLEAR_NEW_FLAG)} hours (CLEAR_NEW_FLAG setting)') mylog("verbose", f'[{pluginName}] Devices: Clear "New Device" flag older than {str(CLEAR_NEW_FLAG)} hours')
query = f"""UPDATE Devices SET devIsNew = 0 WHERE devIsNew = 1 AND date(devFirstConnection, '+{str(CLEAR_NEW_FLAG)} hour') < date('now')""" query = f"""UPDATE Devices SET devIsNew = 0 WHERE devIsNew = 1 AND date(devFirstConnection, '+{str(CLEAR_NEW_FLAG)} hour') < date('now')"""
# select * from Devices where devIsNew = 1 AND date(devFirstConnection, '+3 hour' ) < date('now') mylog("verbose", [f"[{pluginName}] Query: {query}"])
mylog("verbose", [f"[{pluginName}] Query: {query} "])
cursor.execute(query) cursor.execute(query)
mylog("verbose", [f"[{pluginName}] Devices updated rows (clear new): {cursor.rowcount}"])
# ----------------------------------------------------- # -----------------------------------------------------
# De-dupe (de-duplicate) from the Plugins_Objects table # De-dupe Plugins_Objects
# TODO This shouldn't be necessary - probably a concurrency bug somewhere in the code :(
mylog("verbose", [f"[{pluginName}] Plugins_Objects: Delete all duplicates"]) mylog("verbose", [f"[{pluginName}] Plugins_Objects: Delete all duplicates"])
cursor.execute( cursor.execute(
""" """
@@ -197,25 +190,20 @@ def cleanup_database(
) )
""" """
) )
mylog("verbose", [f"[{pluginName}] Plugins_Objects deleted rows: {cursor.rowcount}"])
conn.commit() conn.commit()
# Check WAL file size # WAL + Vacuum
cursor.execute("PRAGMA wal_checkpoint(TRUNCATE);") cursor.execute("PRAGMA wal_checkpoint(TRUNCATE);")
cursor.execute("PRAGMA wal_checkpoint(FULL);") cursor.execute("PRAGMA wal_checkpoint(FULL);")
mylog("verbose", [f"[{pluginName}] WAL checkpoint executed to truncate file."]) mylog("verbose", [f"[{pluginName}] WAL checkpoint executed to truncate file."])
# Shrink DB
mylog("verbose", [f"[{pluginName}] Shrink Database"]) mylog("verbose", [f"[{pluginName}] Shrink Database"])
cursor.execute("VACUUM;") cursor.execute("VACUUM;")
# Close the database connection
conn.close() conn.close()
# ===============================================================================
# BEGIN
# ===============================================================================
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@@ -242,6 +242,23 @@ def ensure_Indexes(sql) -> bool:
Parameters: Parameters:
- sql: database cursor or connection wrapper (must support execute()). - sql: database cursor or connection wrapper (must support execute()).
""" """
# Remove after 12/12/2026 - prevens idx_events_unique from failing - dedupe
clean_duplicate_events = """
DELETE FROM Events
WHERE rowid NOT IN (
SELECT MIN(rowid)
FROM Events
GROUP BY
eve_MAC,
eve_IP,
eve_EventType,
eve_DateTime
);
"""
sql.execute(clean_duplicate_events)
indexes = [ indexes = [
# Sessions # Sessions
( (
@@ -269,6 +286,10 @@ def ensure_Indexes(sql) -> bool:
"idx_eve_type_date", "idx_eve_type_date",
"CREATE INDEX idx_eve_type_date ON Events(eve_EventType, eve_DateTime)", "CREATE INDEX idx_eve_type_date ON Events(eve_EventType, eve_DateTime)",
), ),
(
"idx_events_unique",
"CREATE UNIQUE INDEX idx_events_unique ON Events (eve_MAC, eve_IP, eve_EventType, eve_DateTime)",
),
# Devices # Devices
("idx_dev_mac", "CREATE INDEX idx_dev_mac ON Devices(devMac)"), ("idx_dev_mac", "CREATE INDEX idx_dev_mac ON Devices(devMac)"),
( (

View File

@@ -88,7 +88,7 @@ class EventInstance:
def add(self, mac, ip, eventType, info="", pendingAlert=True, pairRow=None): def add(self, mac, ip, eventType, info="", pendingAlert=True, pairRow=None):
conn = self._conn() conn = self._conn()
conn.execute(""" conn.execute("""
INSERT INTO Events ( INSERT OR IGNORE INTO Events (
eve_MAC, eve_IP, eve_DateTime, eve_MAC, eve_IP, eve_DateTime,
eve_EventType, eve_AdditionalInfo, eve_EventType, eve_AdditionalInfo,
eve_PendingAlertEmail, eve_PairEventRowid eve_PendingAlertEmail, eve_PairEventRowid
@@ -124,7 +124,7 @@ class EventInstance:
cur = conn.cursor() cur = conn.cursor()
cur.execute( cur.execute(
""" """
INSERT INTO Events (eve_MAC, eve_IP, eve_DateTime, eve_EventType, eve_AdditionalInfo, eve_PendingAlertEmail) INSERT OR IGNORE INTO Events (eve_MAC, eve_IP, eve_DateTime, eve_EventType, eve_AdditionalInfo, eve_PendingAlertEmail)
VALUES (?, ?, ?, ?, ?, ?) VALUES (?, ?, ?, ?, ?, ?)
""", """,
(mac, ip, start_time, event_type, additional_info, pending_alert), (mac, ip, start_time, event_type, additional_info, pending_alert),

View File

@@ -606,7 +606,7 @@ def create_new_devices(db):
mylog("debug", '[New Devices] Insert "New Device" Events') mylog("debug", '[New Devices] Insert "New Device" Events')
query_new_device_events = f""" query_new_device_events = f"""
INSERT INTO Events ( INSERT OR IGNORE INTO Events (
eve_MAC, eve_IP, eve_DateTime, eve_MAC, eve_IP, eve_DateTime,
eve_EventType, eve_AdditionalInfo, eve_EventType, eve_AdditionalInfo,
eve_PendingAlertEmail eve_PendingAlertEmail

View File

@@ -171,7 +171,7 @@ def insert_events(db):
# Check device down # Check device down
mylog("debug", "[Events] - 1 - Devices down") mylog("debug", "[Events] - 1 - Devices down")
sql.execute(f"""INSERT INTO Events (eve_MAC, eve_IP, eve_DateTime, sql.execute(f"""INSERT OR IGNORE INTO Events (eve_MAC, eve_IP, eve_DateTime,
eve_EventType, eve_AdditionalInfo, eve_EventType, eve_AdditionalInfo,
eve_PendingAlertEmail) eve_PendingAlertEmail)
SELECT devMac, devLastIP, '{startTime}', 'Device Down', '', 1 SELECT devMac, devLastIP, '{startTime}', 'Device Down', '', 1
@@ -184,7 +184,7 @@ def insert_events(db):
# Check new Connections or Down Reconnections # Check new Connections or Down Reconnections
mylog("debug", "[Events] - 2 - New Connections") mylog("debug", "[Events] - 2 - New Connections")
sql.execute(f""" INSERT INTO Events (eve_MAC, eve_IP, eve_DateTime, sql.execute(f""" INSERT OR IGNORE INTO Events (eve_MAC, eve_IP, eve_DateTime,
eve_EventType, eve_AdditionalInfo, eve_EventType, eve_AdditionalInfo,
eve_PendingAlertEmail) eve_PendingAlertEmail)
SELECT DISTINCT c.scanMac, c.scanLastIP, '{startTime}', SELECT DISTINCT c.scanMac, c.scanLastIP, '{startTime}',
@@ -201,7 +201,7 @@ def insert_events(db):
# Check disconnections # Check disconnections
mylog("debug", "[Events] - 3 - Disconnections") mylog("debug", "[Events] - 3 - Disconnections")
sql.execute(f"""INSERT INTO Events (eve_MAC, eve_IP, eve_DateTime, sql.execute(f"""INSERT OR IGNORE INTO Events (eve_MAC, eve_IP, eve_DateTime,
eve_EventType, eve_AdditionalInfo, eve_EventType, eve_AdditionalInfo,
eve_PendingAlertEmail) eve_PendingAlertEmail)
SELECT devMac, devLastIP, '{startTime}', 'Disconnected', '', SELECT devMac, devLastIP, '{startTime}', 'Disconnected', '',
@@ -215,7 +215,7 @@ def insert_events(db):
# Check IP Changed # Check IP Changed
mylog("debug", "[Events] - 4 - IP Changes") mylog("debug", "[Events] - 4 - IP Changes")
sql.execute(f"""INSERT INTO Events (eve_MAC, eve_IP, eve_DateTime, sql.execute(f"""INSERT OR IGNORE INTO Events (eve_MAC, eve_IP, eve_DateTime,
eve_EventType, eve_AdditionalInfo, eve_EventType, eve_AdditionalInfo,
eve_PendingAlertEmail) eve_PendingAlertEmail)
SELECT scanMac, scanLastIP, '{startTime}', 'IP Changed', SELECT scanMac, scanLastIP, '{startTime}', 'IP Changed',

View File

@@ -123,7 +123,7 @@ class TestSafeConditionBuilder(unittest.TestCase):
"'; DROP TABLE Devices; --", "'; DROP TABLE Devices; --",
"' UNION SELECT * FROM Settings --", "' UNION SELECT * FROM Settings --",
"' OR 1=1 --", "' OR 1=1 --",
"'; INSERT INTO Events VALUES(1,2,3); --", "'; INSERT OR IGNORE INTO Events VALUES(1,2,3); --",
"' AND (SELECT COUNT(*) FROM sqlite_master) > 0 --", "' AND (SELECT COUNT(*) FROM sqlite_master) > 0 --",
"'; ATTACH DATABASE '/etc/passwd' AS pwn; --" "'; ATTACH DATABASE '/etc/passwd' AS pwn; --"
] ]

View File

@@ -204,7 +204,7 @@ def test_sql_injection_prevention(builder):
"'; DROP TABLE Events_Devices; --", "'; DROP TABLE Events_Devices; --",
"' OR '1'='1", "' OR '1'='1",
"1' UNION SELECT * FROM Devices --", "1' UNION SELECT * FROM Devices --",
"'; INSERT INTO Events VALUES ('hacked'); --", "'; INSERT OR IGNORE INTO Events VALUES ('hacked'); --",
"' AND (SELECT COUNT(*) FROM sqlite_master) > 0 --" "' AND (SELECT COUNT(*) FROM sqlite_master) > 0 --"
] ]
for payload in malicious_inputs: for payload in malicious_inputs: