timestamp cleanup

This commit is contained in:
Jokob @NetAlertX
2026-02-11 01:55:02 +00:00
parent e0d4e9ea9c
commit 45157b6156
44 changed files with 775 additions and 190 deletions

View File

@@ -42,11 +42,18 @@ Nested subprocess calls need their own timeout—outer timeout won't save you.
## Time Utilities
```python
from utils.datetime_utils import timeNowDB
from utils.datetime_utils import timeNowUTC
timestamp = timeNowDB()
timestamp = timeNowUTC()
```
This is the ONLY function that calls datetime.datetime.now() in the entire codebase.
⚠️ CRITICAL: ALL database timestamps MUST be stored in UTC
# This is the SINGLE SOURCE OF TRUTH for current time in NetAlertX
# Use timeNowUTC() for DB writes (returns UTC string by default)
# Use timeNowUTC(as_string=False) for datetime operations (scheduling, comparisons, logging)
## String Sanitization
Use sanitizers from `server/helper.py` before storing user input.

View File

@@ -12,7 +12,7 @@ on:
type: boolean
default: false
run_backend:
description: '📂 backend/ (SQL Builder & Security)'
description: '📂 backend/ & db/ (SQL Builder, Security & Migration)'
type: boolean
default: false
run_docker_env:
@@ -43,9 +43,9 @@ jobs:
run: |
PATHS=""
# Folder Mapping with 'test/' prefix
if [ "${{ github.event.inputs.scan }}" == "true" ]; then PATHS="$PATHS test/scan/"; fi
if [ "${{ github.event.inputs.run_scan }}" == "true" ]; then PATHS="$PATHS test/scan/"; fi
if [ "${{ github.event.inputs.run_api }}" == "true" ]; then PATHS="$PATHS test/api_endpoints/ test/server/"; fi
if [ "${{ github.event.inputs.run_backend }}" == "true" ]; then PATHS="$PATHS test/backend/"; fi
if [ "${{ github.event.inputs.run_backend }}" == "true" ]; then PATHS="$PATHS test/backend/ test/db/"; fi
if [ "${{ github.event.inputs.run_docker_env }}" == "true" ]; then PATHS="$PATHS test/docker_tests/"; fi
if [ "${{ github.event.inputs.run_ui }}" == "true" ]; then PATHS="$PATHS test/ui/"; fi

View File

@@ -447,21 +447,35 @@ function localizeTimestamp(input) {
return formatSafe(input, tz);
function formatSafe(str, tz) {
const date = new Date(str);
// CHECK: Does the input string have timezone information?
// - Ends with Z: "2026-02-11T11:37:02Z"
// - Has GMT±offset: "Wed Feb 11 2026 12:34:12 GMT+1100 (...)"
// - Has offset at end: "2026-02-11 11:37:02+11:00"
// - Has timezone name in parentheses: "(Australian Eastern Daylight Time)"
const hasOffset = /Z$/i.test(str.trim()) ||
/GMT[+-]\d{2,4}/.test(str) ||
/[+-]\d{2}:?\d{2}$/.test(str.trim()) ||
/\([^)]+\)$/.test(str.trim());
// ⚠️ CRITICAL: All DB timestamps are stored in UTC without timezone markers.
// If no offset is present, we must explicitly mark it as UTC by appending 'Z'
// so JavaScript doesn't interpret it as local browser time.
let isoStr = str.trim();
if (!hasOffset) {
// Ensure proper ISO format before appending Z
// Replace space with 'T' if needed: "2026-02-11 11:37:02" → "2026-02-11T11:37:02Z"
isoStr = isoStr.replace(' ', 'T') + 'Z';
}
const date = new Date(isoStr);
if (!isFinite(date)) {
console.error(`ERROR: Couldn't parse date: '${str}' with TIMEZONE ${tz}`);
return 'Failed conversion';
}
// CHECK: Does the input string have an offset (e.g., +11:00 or Z)?
// If it does, and we apply a 'tz' again, we double-shift.
const hasOffset = /[Z|[+-]\d{2}:?\d{2}]$/.test(str.trim());
return new Intl.DateTimeFormat(LOCALE, {
// If it has an offset, we display it as-is (UTC mode in Intl
// effectively means "don't add more hours").
// If no offset, apply your variable 'tz'.
timeZone: hasOffset ? 'UTC' : tz,
// Convert from UTC to user's configured timezone
timeZone: tz,
year: 'numeric', month: '2-digit', day: '2-digit',
hour: '2-digit', minute: '2-digit', second: '2-digit',
hour12: false

View File

@@ -11,7 +11,7 @@ sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
import conf # noqa: E402 [flake8 lint suppression]
from const import confFileName, logPath # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowDB # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowUTC # noqa: E402 [flake8 lint suppression]
from plugin_helper import Plugin_Objects # noqa: E402 [flake8 lint suppression]
from logger import mylog, Logger # noqa: E402 [flake8 lint suppression]
from helper import get_setting_value # noqa: E402 [flake8 lint suppression]
@@ -60,7 +60,7 @@ def main():
# Log result
plugin_objects.add_object(
primaryId = pluginName,
secondaryId = timeNowDB(),
secondaryId = timeNowUTC(),
watched1 = notification["GUID"],
watched2 = result,
watched3 = 'null',

View File

@@ -19,7 +19,7 @@ sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
import conf # noqa: E402 [flake8 lint suppression]
from const import confFileName, logPath # noqa: E402 [flake8 lint suppression]
from plugin_helper import Plugin_Objects # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowDB # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowUTC # noqa: E402 [flake8 lint suppression]
from logger import mylog, Logger # noqa: E402 [flake8 lint suppression]
from helper import get_setting_value, hide_email # noqa: E402 [flake8 lint suppression]
from models.notification_instance import NotificationInstance # noqa: E402 [flake8 lint suppression]
@@ -80,7 +80,7 @@ def main():
# Log result
plugin_objects.add_object(
primaryId = pluginName,
secondaryId = timeNowDB(),
secondaryId = timeNowUTC(),
watched1 = notification["GUID"],
watched2 = result,
watched3 = 'null',

View File

@@ -26,7 +26,7 @@ from logger import mylog, Logger # noqa: E402 [flake8 lint suppression]
from helper import get_setting_value, bytes_to_string, \
sanitize_string, normalize_string # noqa: E402 [flake8 lint suppression]
from database import DB, get_device_stats # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowDB # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowUTC # noqa: E402 [flake8 lint suppression]
from models.notification_instance import NotificationInstance # noqa: E402 [flake8 lint suppression]
# Make sure the TIMEZONE for logging is correct
@@ -583,7 +583,7 @@ def publish_notifications(db, mqtt_client):
# Optional: attach meta info
payload["_meta"] = {
"published_at": timeNowDB(),
"published_at": timeNowUTC(),
"source": "NetAlertX",
"notification_GUID": notification["GUID"]
}
@@ -631,7 +631,7 @@ def prepTimeStamp(datetime_str):
except ValueError:
mylog('verbose', [f"[{pluginName}] Timestamp conversion failed of string '{datetime_str}'"])
# Use the current time if the input format is invalid
parsed_datetime = datetime.now(conf.tz)
parsed_datetime = timeNowUTC(as_string=False)
# Convert to the required format with 'T' between date and time and ensure the timezone is included
return parsed_datetime.isoformat() # This will include the timezone offset

View File

@@ -13,7 +13,7 @@ sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
import conf # noqa: E402 [flake8 lint suppression]
from const import confFileName, logPath # noqa: E402 [flake8 lint suppression]
from plugin_helper import Plugin_Objects, handleEmpty # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowDB # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowUTC # noqa: E402 [flake8 lint suppression]
from logger import mylog, Logger # noqa: E402 [flake8 lint suppression]
from helper import get_setting_value # noqa: E402 [flake8 lint suppression]
from models.notification_instance import NotificationInstance # noqa: E402 [flake8 lint suppression]
@@ -63,7 +63,7 @@ def main():
# Log result
plugin_objects.add_object(
primaryId = pluginName,
secondaryId = timeNowDB(),
secondaryId = timeNowUTC(),
watched1 = notification["GUID"],
watched2 = handleEmpty(response_text),
watched3 = response_status_code,

View File

@@ -15,7 +15,7 @@ sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
from plugin_helper import Plugin_Objects, handleEmpty # noqa: E402 [flake8 lint suppression]
from logger import mylog, Logger # noqa: E402 [flake8 lint suppression]
from helper import get_setting_value, hide_string # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowDB # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowUTC # noqa: E402 [flake8 lint suppression]
from models.notification_instance import NotificationInstance # noqa: E402 [flake8 lint suppression]
from database import DB # noqa: E402 [flake8 lint suppression]
@@ -60,7 +60,7 @@ def main():
# Log result
plugin_objects.add_object(
primaryId=pluginName,
secondaryId=timeNowDB(),
secondaryId=timeNowUTC(),
watched1=notification["GUID"],
watched2=handleEmpty(response_text),
watched3=response_status_code,

View File

@@ -13,7 +13,7 @@ from const import confFileName, logPath # noqa: E402 [flake8 lint suppression]
from plugin_helper import Plugin_Objects, handleEmpty # noqa: E402 [flake8 lint suppression]
from logger import mylog, Logger # noqa: E402 [flake8 lint suppression]
from helper import get_setting_value, hide_string # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowDB # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowUTC # noqa: E402 [flake8 lint suppression]
from models.notification_instance import NotificationInstance # noqa: E402 [flake8 lint suppression]
from database import DB # noqa: E402 [flake8 lint suppression]
from pytz import timezone # noqa: E402 [flake8 lint suppression]
@@ -61,7 +61,7 @@ def main():
# Log result
plugin_objects.add_object(
primaryId = pluginName,
secondaryId = timeNowDB(),
secondaryId = timeNowUTC(),
watched1 = notification["GUID"],
watched2 = handleEmpty(response_text),
watched3 = response_status_code,

View File

@@ -11,7 +11,7 @@ sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
import conf # noqa: E402 [flake8 lint suppression]
from const import confFileName, logPath # noqa: E402 [flake8 lint suppression]
from plugin_helper import Plugin_Objects # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowDB # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowUTC # noqa: E402 [flake8 lint suppression]
from logger import mylog, Logger # noqa: E402 [flake8 lint suppression]
from helper import get_setting_value # noqa: E402 [flake8 lint suppression]
from models.notification_instance import NotificationInstance # noqa: E402 [flake8 lint suppression]
@@ -60,7 +60,7 @@ def main():
# Log result
plugin_objects.add_object(
primaryId=pluginName,
secondaryId=timeNowDB(),
secondaryId=timeNowUTC(),
watched1=notification["GUID"],
watched2=result,
watched3='null',

View File

@@ -15,7 +15,7 @@ sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
import conf # noqa: E402 [flake8 lint suppression]
from const import logPath, confFileName # noqa: E402 [flake8 lint suppression]
from plugin_helper import Plugin_Objects, handleEmpty # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowDB # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowUTC # noqa: E402 [flake8 lint suppression]
from logger import mylog, Logger # noqa: E402 [flake8 lint suppression]
from helper import get_setting_value, write_file # noqa: E402 [flake8 lint suppression]
from models.notification_instance import NotificationInstance # noqa: E402 [flake8 lint suppression]
@@ -69,7 +69,7 @@ def main():
# Log result
plugin_objects.add_object(
primaryId = pluginName,
secondaryId = timeNowDB(),
secondaryId = timeNowUTC(),
watched1 = notification["GUID"],
watched2 = handleEmpty(response_stdout),
watched3 = handleEmpty(response_stderr),

View File

@@ -4,7 +4,6 @@ import os
import argparse
import sys
import csv
from datetime import datetime
# Register NetAlertX directories
INSTALL_PATH = os.getenv('NETALERTX_APP', '/app')
@@ -13,6 +12,7 @@ sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
from logger import mylog, Logger # noqa: E402 [flake8 lint suppression]
from helper import get_setting_value # noqa: E402 [flake8 lint suppression]
from const import logPath # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowUTC # noqa: E402 [flake8 lint suppression]
import conf # noqa: E402 [flake8 lint suppression]
from pytz import timezone # noqa: E402 [flake8 lint suppression]
from database import get_temp_db_connection # noqa: E402 [flake8 lint suppression]
@@ -60,7 +60,7 @@ def main():
if overwrite:
filename = 'devices.csv'
else:
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
timestamp = timeNowUTC(as_string=False).strftime('%Y%m%d%H%M%S')
filename = f'devices_{timestamp}.csv'
fullPath = os.path.join(values.location.split('=')[1], filename)

View File

@@ -22,7 +22,7 @@ from logger import mylog, Logger # noqa: E402 [flake8 lint suppression]
from const import logPath # noqa: E402 [flake8 lint suppression]
from helper import get_setting_value # noqa: E402 [flake8 lint suppression]
import conf # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowDB, DATETIME_PATTERN # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowUTC, DATETIME_PATTERN # noqa: E402 [flake8 lint suppression]
# Make sure the TIMEZONE for logging is correct
conf.tz = timezone(get_setting_value("TIMEZONE"))
@@ -151,7 +151,7 @@ def main():
watched1=freebox["name"],
watched2=freebox["operator"],
watched3="Gateway",
watched4=timeNowDB(),
watched4=timeNowUTC(),
extra="",
foreignKey=freebox["mac"],
)

View File

@@ -12,7 +12,7 @@ INSTALL_PATH = os.getenv('NETALERTX_APP', '/app')
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
from plugin_helper import Plugin_Objects # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowDB # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowUTC # noqa: E402 [flake8 lint suppression]
from logger import mylog, Logger, append_line_to_file # noqa: E402 [flake8 lint suppression]
from helper import check_IP_format, get_setting_value # noqa: E402 [flake8 lint suppression]
from const import logPath # noqa: E402 [flake8 lint suppression]
@@ -74,7 +74,7 @@ def main():
mylog('verbose', [f'[{pluginName}] Curl Fallback (new_internet_IP|cmd_output): {new_internet_IP} | {cmd_output}'])
# logging
append_line_to_file(logPath + '/IP_changes.log', '[' + str(timeNowDB()) + ']\t' + new_internet_IP + '\n')
append_line_to_file(logPath + '/IP_changes.log', '[' + str(timeNowUTC()) + ']\t' + new_internet_IP + '\n')
plugin_objects = Plugin_Objects(RESULT_FILE)

View File

@@ -11,7 +11,7 @@ INSTALL_PATH = os.getenv('NETALERTX_APP', '/app')
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
from plugin_helper import Plugin_Objects # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowDB # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowUTC # noqa: E402 [flake8 lint suppression]
from logger import mylog, Logger # noqa: E402 [flake8 lint suppression]
from helper import get_setting_value # noqa: E402 [flake8 lint suppression]
import conf # noqa: E402 [flake8 lint suppression]
@@ -37,7 +37,7 @@ def main():
speedtest_result = run_speedtest()
plugin_objects.add_object(
primaryId = 'Speedtest',
secondaryId = timeNowDB(),
secondaryId = timeNowUTC(),
watched1 = speedtest_result['download_speed'],
watched2 = speedtest_result['upload_speed'],
watched3 = speedtest_result['full_json'],

View File

@@ -3,7 +3,6 @@
import os
import sys
import subprocess
from datetime import datetime
from pytz import timezone
from functools import reduce
@@ -13,6 +12,7 @@ sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
from plugin_helper import Plugin_Objects # noqa: E402 [flake8 lint suppression]
from logger import mylog, Logger # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowUTC # noqa: E402 [flake8 lint suppression]
from const import logPath # noqa: E402 [flake8 lint suppression]
from helper import get_setting_value # noqa: E402 [flake8 lint suppression]
import conf # noqa: E402 [flake8 lint suppression]
@@ -95,7 +95,7 @@ def parse_neighbors(raw_neighbors: list[str]):
neighbor = {}
neighbor['ip'] = fields[0]
neighbor['mac'] = fields[2]
neighbor['last_seen'] = datetime.now()
neighbor['last_seen'] = timeNowUTC()
# Unknown data
neighbor['hostname'] = '(unknown)'

View File

@@ -11,7 +11,7 @@ sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
from plugin_helper import Plugin_Objects # noqa: E402 [flake8 lint suppression]
from logger import mylog, Logger, append_line_to_file # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowDB # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowUTC # noqa: E402 [flake8 lint suppression]
from helper import get_setting_value # noqa: E402 [flake8 lint suppression]
from const import logPath # noqa: E402 [flake8 lint suppression]
import conf # noqa: E402 [flake8 lint suppression]
@@ -213,7 +213,7 @@ def performNmapScan(deviceIPs, deviceMACs, timeoutSec, args):
elif 'PORT' in line and 'STATE' in line and 'SERVICE' in line:
startCollecting = False # end reached
elif startCollecting and len(line.split()) == 3:
newEntriesTmp.append(nmap_entry(ip, deviceMACs[devIndex], timeNowDB(), line.split()[0], line.split()[1], line.split()[2]))
newEntriesTmp.append(nmap_entry(ip, deviceMACs[devIndex], timeNowUTC(), line.split()[0], line.split()[1], line.split()[2]))
newPortsPerDevice += 1
elif 'Nmap done' in line:
duration = line.split('scanned in ')[1]

View File

@@ -6,7 +6,6 @@ Imports devices from Pi-hole v6 API (Network endpoints) into NetAlertX plugin re
import os
import sys
import datetime
import requests
import json
from requests.packages.urllib3.exceptions import InsecureRequestWarning
@@ -18,6 +17,7 @@ sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
pluginName = 'PIHOLEAPI'
from plugin_helper import Plugin_Objects, is_mac # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowUTC # noqa: E402 [flake8 lint suppression]
from logger import mylog, Logger # noqa: E402 [flake8 lint suppression]
from helper import get_setting_value # noqa: E402 [flake8 lint suppression]
from const import logPath # noqa: E402 [flake8 lint suppression]
@@ -201,7 +201,7 @@ def gather_device_entries():
"""
entries = []
devices = get_pihole_network_devices()
now_ts = int(datetime.datetime.now().timestamp())
now_ts = int(timeNowUTC(as_string=False).timestamp())
for device in devices:
hwaddr = device.get('hwaddr')

View File

@@ -12,7 +12,7 @@ sys.path.append(f"{INSTALL_PATH}/front/plugins")
sys.path.append(f'{INSTALL_PATH}/server')
from logger import mylog # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowDB # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowUTC # noqa: E402 [flake8 lint suppression]
from const import default_tz, fullConfPath # noqa: E402 [flake8 lint suppression]
@@ -237,7 +237,7 @@ class Plugin_Object:
self.pluginPref = ""
self.primaryId = primaryId
self.secondaryId = secondaryId
self.created = timeNowDB()
self.created = timeNowUTC()
self.changed = ""
self.watched1 = watched1
self.watched2 = watched2

View File

@@ -16,7 +16,7 @@ from utils.plugin_utils import get_plugins_configs, decode_and_rename_files # n
from logger import mylog, Logger # noqa: E402 [flake8 lint suppression]
from const import logPath # noqa: E402 [flake8 lint suppression]
from helper import get_setting_value # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowDB # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowUTC # noqa: E402 [flake8 lint suppression]
from utils.crypto_utils import encrypt_data # noqa: E402 [flake8 lint suppression]
from messaging.in_app import write_notification # noqa: E402 [flake8 lint suppression]
import conf # noqa: E402 [flake8 lint suppression]
@@ -147,7 +147,7 @@ def main():
message = f'[{pluginName}] Device data from node "{node_name}" written to {log_file_name}'
mylog('verbose', [message])
if lggr.isAbove('verbose'):
write_notification(message, 'info', timeNowDB())
write_notification(message, 'info', timeNowUTC())
# Process any received data for the Device DB table (ONLY JSON)
# Create the file path
@@ -253,7 +253,7 @@ def main():
message = f'[{pluginName}] Inserted "{len(new_devices)}" new devices'
mylog('verbose', [message])
write_notification(message, 'info', timeNowDB())
write_notification(message, 'info', timeNowUTC())
# Commit and close the connection
conn.commit()
@@ -298,7 +298,7 @@ def send_data(api_token, file_content, encryption_key, file_path, node_name, pre
if response.status_code == 200:
message = f'[{pluginName}] Data for "{file_path}" sent successfully via {final_endpoint}'
mylog('verbose', [message])
write_notification(message, 'info', timeNowDB())
write_notification(message, 'info', timeNowUTC())
return True
except requests.RequestException as e:
@@ -307,7 +307,7 @@ def send_data(api_token, file_content, encryption_key, file_path, node_name, pre
# If all endpoints fail
message = f'[{pluginName}] Failed to send data for "{file_path}" via all endpoints'
mylog('verbose', [message])
write_notification(message, 'alert', timeNowDB())
write_notification(message, 'alert', timeNowUTC())
return False
@@ -331,7 +331,7 @@ def get_data(api_token, node_url):
except json.JSONDecodeError:
message = f'[{pluginName}] Failed to parse JSON from {final_endpoint}'
mylog('verbose', [message])
write_notification(message, 'alert', timeNowDB())
write_notification(message, 'alert', timeNowUTC())
return ""
except requests.RequestException as e:
mylog('verbose', [f'[{pluginName}] Error calling {final_endpoint}: {e}'])
@@ -339,7 +339,7 @@ def get_data(api_token, node_url):
# If all endpoints fail
message = f'[{pluginName}] Failed to get data from "{node_url}" via all endpoints'
mylog('verbose', [message])
write_notification(message, 'alert', timeNowDB())
write_notification(message, 'alert', timeNowUTC())
return ""

View File

@@ -25,7 +25,7 @@ import conf
from const import fullConfPath, sql_new_devices
from logger import mylog
from helper import filePermissions
from utils.datetime_utils import timeNowTZ
from utils.datetime_utils import timeNowUTC
from app_state import updateState
from api import update_api
from scan.session_events import process_scan
@@ -104,7 +104,7 @@ def main():
pm, all_plugins, imported = importConfigs(pm, db, all_plugins)
# update time started
conf.loop_start_time = timeNowTZ()
conf.loop_start_time = timeNowUTC(as_string=False)
loop_start_time = conf.loop_start_time # TODO fix

View File

@@ -23,7 +23,7 @@ from const import (
)
from logger import mylog
from helper import write_file, get_setting_value
from utils.datetime_utils import timeNowTZ
from utils.datetime_utils import timeNowUTC
from app_state import updateState
from models.user_events_queue_instance import UserEventsQueueInstance
@@ -105,7 +105,7 @@ def update_api(
class api_endpoint_class:
def __init__(self, db, forceUpdate, query, path, is_ad_hoc_user_event=False):
current_time = timeNowTZ()
current_time = timeNowUTC(as_string=False)
self.db = db
self.query = query
@@ -163,7 +163,7 @@ class api_endpoint_class:
# ----------------------------------------
def try_write(self, forceUpdate):
current_time = timeNowTZ()
current_time = timeNowUTC(as_string=False)
# Debugging info to understand the issue
# mylog('debug', [f'[API] api_endpoint_class: {self.fileName} is_ad_hoc_user_event
@@ -183,7 +183,7 @@ class api_endpoint_class:
write_file(self.path, json.dumps(self.jsonData))
self.needsUpdate = False
self.last_update_time = timeNowTZ() # Reset last_update_time after writing
self.last_update_time = timeNowUTC(as_string=False) # Reset last_update_time after writing
# Update user event execution log
# mylog('verbose', [f'[API] api_endpoint_class: is_ad_hoc_user_event {self.is_ad_hoc_user_event}'])

View File

@@ -12,7 +12,7 @@ sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
from database import get_temp_db_connection # noqa: E402 [flake8 lint suppression]
from helper import get_setting_value, format_ip_long # noqa: E402 [flake8 lint suppression]
from db.db_helper import get_date_from_period # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowDB, format_date_iso, format_event_date, format_date_diff, format_date # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowUTC, format_date_iso, format_event_date, format_date_diff, format_date # noqa: E402 [flake8 lint suppression]
# --------------------------
@@ -165,7 +165,7 @@ def get_sessions_calendar(start_date, end_date, mac):
rows = cur.fetchall()
conn.close()
now_iso = timeNowDB()
now_iso = timeNowUTC()
events = []
for row in rows:

View File

@@ -3,7 +3,7 @@ import base64
from flask import jsonify, request
from logger import mylog
from helper import get_setting_value
from utils.datetime_utils import timeNowDB
from utils.datetime_utils import timeNowUTC
from messaging.in_app import write_notification
INSTALL_PATH = os.getenv("NETALERTX_APP", "/app")
@@ -22,19 +22,19 @@ def handle_sync_get():
raw_data = f.read()
except FileNotFoundError:
msg = f"[Plugin: SYNC] Data file not found: {file_path}"
write_notification(msg, "alert", timeNowDB())
write_notification(msg, "alert", timeNowUTC())
mylog("verbose", [msg])
return jsonify({"error": msg}), 500
response_data = base64.b64encode(raw_data).decode("utf-8")
write_notification("[Plugin: SYNC] Data sent", "info", timeNowDB())
write_notification("[Plugin: SYNC] Data sent", "info", timeNowUTC())
return jsonify({
"node_name": get_setting_value("SYNC_node_name"),
"status": 200,
"message": "OK",
"data_base64": response_data,
"timestamp": timeNowDB()
"timestamp": timeNowUTC()
}), 200
@@ -68,11 +68,11 @@ def handle_sync_post():
f.write(data)
except Exception as e:
msg = f"[Plugin: SYNC] Failed to store data: {e}"
write_notification(msg, "alert", timeNowDB())
write_notification(msg, "alert", timeNowUTC())
mylog("verbose", [msg])
return jsonify({"error": msg}), 500
msg = f"[Plugin: SYNC] Data received ({file_path_new})"
write_notification(msg, "info", timeNowDB())
write_notification(msg, "info", timeNowUTC())
mylog("verbose", [msg])
return jsonify({"message": "Data received and stored successfully"}), 200

View File

@@ -4,7 +4,7 @@ import json
from const import applicationPath, apiPath
from logger import mylog
from helper import checkNewVersion
from utils.datetime_utils import timeNowDB, timeNow
from utils.datetime_utils import timeNowUTC
from api_server.sse_broadcast import broadcast_state_update
# Register NetAlertX directories using runtime configuration
@@ -67,7 +67,7 @@ class app_state_class:
previousState = ""
# Update self
self.lastUpdated = str(timeNowDB())
self.lastUpdated = str(timeNowUTC())
if os.path.exists(stateFile):
try:
@@ -95,7 +95,7 @@ class app_state_class:
self.showSpinner = False
self.processScan = False
self.isNewVersion = checkNewVersion()
self.isNewVersionChecked = int(timeNow().timestamp())
self.isNewVersionChecked = int(timeNowUTC(as_string=False).timestamp())
self.graphQLServerStarted = 0
self.currentState = "Init"
self.pluginsStates = {}
@@ -135,10 +135,10 @@ class app_state_class:
self.buildTimestamp = buildTimestamp
# check for new version every hour and if currently not running new version
if self.isNewVersion is False and self.isNewVersionChecked + 3600 < int(
timeNow().timestamp()
timeNowUTC(as_string=False).timestamp()
):
self.isNewVersion = checkNewVersion()
self.isNewVersionChecked = int(timeNow().timestamp())
self.isNewVersionChecked = int(timeNowUTC(as_string=False).timestamp())
# Update .json file
# with open(stateFile, 'w') as json_file:

View File

@@ -17,6 +17,7 @@ from db.db_upgrade import (
ensure_Settings,
ensure_Indexes,
ensure_mac_lowercase_triggers,
migrate_timestamps_to_utc,
)
@@ -187,6 +188,9 @@ class DB:
# Parameters tables setup
ensure_Parameters(self.sql)
# One-time UTC timestamp migration (must run after Parameters table exists)
migrate_timestamps_to_utc(self.sql)
# Plugins tables setup
ensure_plugins_tables(self.sql)

View File

@@ -228,7 +228,7 @@ def ensure_views(sql) -> bool:
)
SELECT
d.*, -- all Device fields
r.* -- all CurrentScan fields
r.* -- all CurrentScan fields
FROM Devices d
LEFT JOIN RankedScans r
ON d.devMac = r.scanMac
@@ -494,3 +494,219 @@ def ensure_plugins_tables(sql) -> bool:
); """)
return True
# ===============================================================================
# UTC Timestamp Migration (added 2026-02-10)
# ===============================================================================
def is_timestamps_in_utc(sql) -> bool:
"""
Check if existing timestamps in Devices table are already in UTC format.
Strategy:
1. Sample 10 non-NULL devFirstConnection timestamps from Devices
2. For each timestamp, assume it's UTC and calculate what it would be in local time
3. Check if timestamps have a consistent offset pattern (indicating local time storage)
4. If offset is consistently > 0, they're likely local timestamps (need migration)
5. If offset is ~0 or inconsistent, they're likely already UTC (skip migration)
Returns:
bool: True if timestamps appear to be in UTC already, False if they need migration
"""
try:
# Get timezone offset in seconds
import conf
from zoneinfo import ZoneInfo
import datetime as dt
now = dt.datetime.now(dt.UTC).replace(microsecond=0)
current_offset_seconds = 0
try:
if isinstance(conf.tz, dt.tzinfo):
tz = conf.tz
elif conf.tz:
tz = ZoneInfo(conf.tz)
else:
tz = None
except Exception:
tz = None
if tz:
local_now = dt.datetime.now(tz).replace(microsecond=0)
local_offset = local_now.utcoffset().total_seconds()
utc_offset = now.utcoffset().total_seconds() if now.utcoffset() else 0
current_offset_seconds = int(local_offset - utc_offset)
# Sample timestamps from Devices table
sql.execute("""
SELECT devFirstConnection, devLastConnection, devLastNotification
FROM Devices
WHERE devFirstConnection IS NOT NULL
LIMIT 10
""")
samples = []
for row in sql.fetchall():
for ts in row:
if ts:
samples.append(ts)
if not samples:
mylog("verbose", "[db_upgrade] No timestamp samples found in Devices - assuming UTC")
return True # Empty DB, assume UTC
# Parse samples and check if they have timezone info (which would indicate migration already done)
has_tz_marker = any('+' in str(ts) or 'Z' in str(ts) for ts in samples)
if has_tz_marker:
mylog("verbose", "[db_upgrade] Timestamps have timezone markers - already migrated to UTC")
return True
mylog("debug", f"[db_upgrade] Sampled {len(samples)} timestamps. Current TZ offset: {current_offset_seconds}s")
mylog("verbose", "[db_upgrade] Timestamps appear to be in system local time - migration needed")
return False
except Exception as e:
mylog("warn", f"[db_upgrade] Error checking UTC status: {e} - assuming UTC")
return True
def migrate_timestamps_to_utc(sql) -> bool:
"""
Migrate all timestamp columns in the database from local time to UTC.
This function determines if migration is needed based on the VERSION setting:
- Fresh installs (no VERSION): Skip migration - timestamps already UTC from timeNowUTC()
- Version >= 26.2.6: Skip migration - already using UTC timestamps
- Version < 26.2.6: Run migration - convert local timestamps to UTC
Affected tables:
- Devices: devFirstConnection, devLastConnection, devLastNotification
- Events: eve_DateTime
- Sessions: ses_DateTimeConnection, ses_DateTimeDisconnection
- Notifications: DateTimeCreated, DateTimePushed
- Online_History: Scan_Date
- Plugins_Objects: DateTimeCreated, DateTimeChanged
- Plugins_Events: DateTimeCreated, DateTimeChanged
- Plugins_History: DateTimeCreated, DateTimeChanged
- AppEvents: DateTimeCreated
Returns:
bool: True if migration completed or wasn't needed, False on error
"""
try:
import conf
from zoneinfo import ZoneInfo
import datetime as dt
# Check VERSION from Settings table (from previous app run)
sql.execute("SELECT setValue FROM Settings WHERE setKey = 'VERSION'")
result = sql.fetchone()
prev_version = result[0] if result else ""
# Fresh install: VERSION is empty → timestamps already UTC from timeNowUTC()
if not prev_version or prev_version == "" or prev_version == "unknown":
mylog("verbose", "[db_upgrade] Fresh install detected - timestamps already in UTC format")
return True
# Parse version - format: "26.2.6" or "v26.2.6"
try:
version_parts = prev_version.strip('v').split('.')
major = int(version_parts[0]) if len(version_parts) > 0 else 0
minor = int(version_parts[1]) if len(version_parts) > 1 else 0
patch = int(version_parts[2]) if len(version_parts) > 2 else 0
# UTC timestamps introduced in v26.2.6
# If upgrading from 26.2.6 or later, timestamps are already UTC
if (major > 26) or (major == 26 and minor > 2) or (major == 26 and minor == 2 and patch >= 6):
mylog("verbose", f"[db_upgrade] Version {prev_version} already uses UTC timestamps - skipping migration")
return True
mylog("verbose", f"[db_upgrade] Upgrading from {prev_version} (< v26.2.6) - migrating timestamps to UTC")
except (ValueError, IndexError) as e:
mylog("warn", f"[db_upgrade] Could not parse version '{prev_version}': {e} - checking timestamps")
# Fallback: use detection logic
if is_timestamps_in_utc(sql):
mylog("verbose", "[db_upgrade] Timestamps appear to be in UTC - skipping migration")
return True
# Get timezone offset
try:
if isinstance(conf.tz, dt.tzinfo):
tz = conf.tz
elif conf.tz:
tz = ZoneInfo(conf.tz)
else:
tz = None
except Exception:
tz = None
if tz:
now_local = dt.datetime.now(tz)
offset_hours = (now_local.utcoffset().total_seconds()) / 3600
else:
offset_hours = 0
mylog("verbose", f"[db_upgrade] Starting UTC timestamp migration (offset: {offset_hours} hours)")
# List of tables and their datetime columns
timestamp_columns = {
'Devices': ['devFirstConnection', 'devLastConnection', 'devLastNotification'],
'Events': ['eve_DateTime'],
'Sessions': ['ses_DateTimeConnection', 'ses_DateTimeDisconnection'],
'Notifications': ['DateTimeCreated', 'DateTimePushed'],
'Online_History': ['Scan_Date'],
'Plugins_Objects': ['DateTimeCreated', 'DateTimeChanged'],
'Plugins_Events': ['DateTimeCreated', 'DateTimeChanged'],
'Plugins_History': ['DateTimeCreated', 'DateTimeChanged'],
'AppEvents': ['DateTimeCreated'],
}
for table, columns in timestamp_columns.items():
try:
# Check if table exists
sql.execute(f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table}'")
if not sql.fetchone():
mylog("debug", f"[db_upgrade] Table '{table}' does not exist - skipping")
continue
for column in columns:
try:
# Update non-NULL timestamps
if offset_hours > 0:
# Convert local to UTC (subtract offset)
sql.execute(f"""
UPDATE {table}
SET {column} = DATETIME({column}, '-{int(offset_hours)} hours', '-{int((offset_hours % 1) * 60)} minutes')
WHERE {column} IS NOT NULL
""")
elif offset_hours < 0:
# Convert local to UTC (add offset absolute value)
abs_hours = abs(int(offset_hours))
abs_mins = int((abs(offset_hours) % 1) * 60)
sql.execute(f"""
UPDATE {table}
SET {column} = DATETIME({column}, '+{abs_hours} hours', '+{abs_mins} minutes')
WHERE {column} IS NOT NULL
""")
row_count = sql.rowcount
if row_count > 0:
mylog("verbose", f"[db_upgrade] Migrated {row_count} timestamps in {table}.{column}")
except Exception as e:
mylog("warn", f"[db_upgrade] Error updating {table}.{column}: {e}")
continue
except Exception as e:
mylog("warn", f"[db_upgrade] Error processing table {table}: {e}")
continue
mylog("none", "[db_upgrade] ✓ UTC timestamp migration completed successfully")
return True
except Exception as e:
mylog("none", f"[db_upgrade] ERROR during timestamp migration: {e}")
return False

View File

@@ -12,7 +12,7 @@ import uuid
import conf
from const import fullConfPath, fullConfFolder, default_tz
from helper import getBuildTimeStampAndVersion, collect_lang_strings, updateSubnets, generate_random_string
from utils.datetime_utils import timeNowDB
from utils.datetime_utils import timeNowUTC
from app_state import updateState
from logger import mylog
from api import update_api
@@ -419,7 +419,7 @@ def importConfigs(pm, db, all_plugins):
# TODO cleanup later ----------------------------------------------------------------------------------
# init all time values as we have timezone - all this shoudl be moved into plugin/plugin settings
conf.time_started = datetime.datetime.now(conf.tz)
conf.time_started = timeNowUTC(as_string=False)
conf.plugins_once_run = False
# timestamps of last execution times
@@ -645,7 +645,7 @@ def importConfigs(pm, db, all_plugins):
if run_val == "schedule":
newSchedule = Cron(run_sch).schedule(
start_date=datetime.datetime.now(conf.tz)
start_date=timeNowUTC(as_string=False)
)
conf.mySchedules.append(
schedule_class(
@@ -682,7 +682,7 @@ def importConfigs(pm, db, all_plugins):
Check out new features and what has changed in the \
<a href="https://github.com/jokob-sk/NetAlertX/releases" target="_blank">📓 release notes</a>.""",
'interrupt',
timeNowDB()
timeNowUTC()
)
# -----------------
@@ -721,7 +721,7 @@ def importConfigs(pm, db, all_plugins):
mylog('minimal', msg)
# front end app log loggging
write_notification(msg, 'info', timeNowDB())
write_notification(msg, 'info', timeNowUTC())
return pm, all_plugins, True
@@ -770,7 +770,7 @@ def renameSettings(config_file):
# If the file contains old settings, proceed with renaming and backup
if contains_old_settings:
# Create a backup file with the suffix "_old_setting_names" and timestamp
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
timestamp = timeNowUTC(as_string=False).strftime("%Y%m%d%H%M%S")
backup_file = f"{config_file}_old_setting_names_{timestamp}.bak"
mylog("debug", f"[Config] Old setting names will be replaced and a backup ({backup_file}) of the config created.",)

View File

@@ -9,7 +9,7 @@ import logging
# NetAlertX imports
import conf
from const import logPath
from utils.datetime_utils import timeNowTZ
from utils.datetime_utils import timeNowUTC
DEFAULT_LEVEL = "none"
@@ -124,7 +124,7 @@ def start_log_writer_thread():
# -------------------------------------------------------------------------------
def file_print(*args):
result = timeNowTZ().strftime("%H:%M:%S") + " "
result = timeNowUTC(as_string=False).strftime("%H:%M:%S") + " "
for arg in args:
if isinstance(arg, list):
arg = " ".join(

View File

@@ -13,7 +13,7 @@ sys.path.extend([f"{INSTALL_PATH}/server"])
from const import apiPath # noqa: E402 [flake8 lint suppression]
from logger import mylog # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowDB # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowUTC # noqa: E402 [flake8 lint suppression]
from api_server.sse_broadcast import broadcast_unread_notifications_count # noqa: E402 [flake8 lint suppression]
@@ -64,7 +64,7 @@ def write_notification(content, level="alert", timestamp=None):
None
"""
if timestamp is None:
timestamp = timeNowDB()
timestamp = timeNowUTC()
notification = {
"timestamp": str(timestamp),

View File

@@ -18,7 +18,7 @@ from db.authoritative_handler import (
unlock_fields
)
from helper import is_random_mac, get_setting_value
from utils.datetime_utils import timeNowDB
from utils.datetime_utils import timeNowUTC
class DeviceInstance:
@@ -407,7 +407,7 @@ class DeviceInstance:
def getDeviceData(self, mac, period=""):
"""Fetch device info with children, event stats, and presence calculation."""
now = timeNowDB()
now = timeNowUTC()
# Special case for new device
if mac.lower() == "new":
@@ -639,8 +639,8 @@ class DeviceInstance:
data.get("devSkipRepeated") or 0,
data.get("devIsNew") or 0,
data.get("devIsArchived") or 0,
data.get("devLastConnection") or timeNowDB(),
data.get("devFirstConnection") or timeNowDB(),
data.get("devLastConnection") or timeNowUTC(),
data.get("devFirstConnection") or timeNowUTC(),
data.get("devLastIP") or "",
data.get("devGUID") or "",
data.get("devCustomProps") or "",

View File

@@ -2,7 +2,7 @@ from datetime import datetime, timedelta
from logger import mylog
from database import get_temp_db_connection
from db.db_helper import row_to_json, get_date_from_period
from utils.datetime_utils import ensure_datetime
from utils.datetime_utils import ensure_datetime, timeNowUTC
# -------------------------------------------------------------------------------
@@ -43,7 +43,7 @@ class EventInstance:
# Get events in the last 24h
def get_recent(self):
since = datetime.now() - timedelta(hours=24)
since = timeNowUTC(as_string=False) - timedelta(hours=24)
conn = self._conn()
rows = conn.execute("""
SELECT * FROM Events
@@ -59,7 +59,7 @@ class EventInstance:
mylog("warn", f"[Events] get_by_hours({hours}) -> invalid value")
return []
since = datetime.now() - timedelta(hours=hours)
since = timeNowUTC(as_string=False) - timedelta(hours=hours)
conn = self._conn()
rows = conn.execute("""
SELECT * FROM Events
@@ -93,14 +93,14 @@ class EventInstance:
eve_EventType, eve_AdditionalInfo,
eve_PendingAlertEmail, eve_PairEventRowid
) VALUES (?,?,?,?,?,?,?)
""", (mac, ip, datetime.now(), eventType, info,
""", (mac, ip, timeNowUTC(as_string=False), eventType, info,
1 if pendingAlert else 0, pairRow))
conn.commit()
conn.close()
# Delete old events
def delete_older_than(self, days: int):
cutoff = datetime.now() - timedelta(days=days)
cutoff = timeNowUTC(as_string=False) - timedelta(days=days)
conn = self._conn()
result = conn.execute("DELETE FROM Events WHERE eve_DateTime < ?", (cutoff,))
conn.commit()

View File

@@ -16,7 +16,7 @@ from helper import (
getBuildTimeStampAndVersion,
)
from messaging.in_app import write_notification
from utils.datetime_utils import timeNowDB, get_timezone_offset
from utils.datetime_utils import timeNowUTC, get_timezone_offset
# -----------------------------------------------------------------------------
@@ -68,7 +68,7 @@ class NotificationInstance:
self.HasNotifications = True
self.GUID = str(uuid.uuid4())
self.DateTimeCreated = timeNowDB()
self.DateTimeCreated = timeNowUTC()
self.DateTimePushed = ""
self.Status = "new"
self.JSON = JSON
@@ -107,7 +107,7 @@ class NotificationInstance:
mail_html = mail_html.replace("NEW_VERSION", newVersionText)
# Report "REPORT_DATE" in Header & footer
timeFormated = timeNowDB()
timeFormated = timeNowUTC()
mail_text = mail_text.replace("REPORT_DATE", timeFormated)
mail_html = mail_html.replace("REPORT_DATE", timeFormated)
@@ -208,7 +208,7 @@ class NotificationInstance:
# Updates the Published properties
def updatePublishedVia(self, newPublishedVia):
self.PublishedVia = newPublishedVia
self.DateTimePushed = timeNowDB()
self.DateTimePushed = timeNowUTC()
self.upsert()
# create or update a notification
@@ -274,7 +274,7 @@ class NotificationInstance:
SELECT eve_MAC FROM Events
WHERE eve_PendingAlertEmail = 1
)
""", (timeNowDB(),))
""", (timeNowUTC(),))
self.db.sql.execute("""
UPDATE Events SET eve_PendingAlertEmail = 0

View File

@@ -3,7 +3,7 @@ import uuid
from const import logPath
from logger import mylog
from utils.datetime_utils import timeNowDB
from utils.datetime_utils import timeNowUTC
class UserEventsQueueInstance:
@@ -90,7 +90,7 @@ class UserEventsQueueInstance:
success - True if the event was successfully added.
message - Log message describing the result.
"""
timestamp = timeNowDB()
timestamp = timeNowUTC()
# Generate GUID
guid = str(uuid.uuid4())

View File

@@ -11,7 +11,7 @@ import conf
from const import pluginsPath, logPath, applicationPath, reportTemplatesPath
from logger import mylog, Logger
from helper import get_file_content, get_setting, get_setting_value
from utils.datetime_utils import timeNowTZ, timeNowDB
from utils.datetime_utils import timeNowUTC
from app_state import updateState
from api import update_api
from utils.plugin_utils import (
@@ -113,7 +113,7 @@ class plugin_manager:
schd = self._cache["schedules"].get(prefix)
if schd:
# note the last time the scheduled plugin run was executed
schd.last_run = timeNowTZ()
schd.last_run = timeNowUTC(as_string=False)
# ===============================================================================
# Handling of user initialized front-end events
@@ -166,14 +166,14 @@ class plugin_manager:
if len(executed_events) > 0 and executed_events:
executed_events_message = ', '.join(executed_events)
mylog('minimal', ['[check_and_run_user_event] INFO: Executed events: ', executed_events_message])
write_notification(f"[Ad-hoc events] Events executed: {executed_events_message}", "interrupt", timeNowDB())
write_notification(f"[Ad-hoc events] Events executed: {executed_events_message}", "interrupt", timeNowUTC())
return
# -------------------------------------------------------------------------------
def handle_run(self, runType):
mylog('minimal', ['[', timeNowDB(), '] START Run: ', runType])
mylog('minimal', ['[', timeNowUTC(), '] START Run: ', runType])
# run the plugin
for plugin in self.all_plugins:
@@ -190,15 +190,13 @@ class plugin_manager:
pluginsStates={pluginName: current_plugin_state.get(pluginName, {})}
)
mylog('minimal', ['[', timeNowDB(), '] END Run: ', runType])
mylog('minimal', ['[', timeNowUTC(), '] END Run: ', runType])
return
# -------------------------------------------------------------------------------
def handle_test(self, runType):
mylog("minimal", ["[", timeNowTZ(), "] [Test] START Test: ", runType])
mylog('minimal', ['[', timeNowDB(), '] [Test] START Test: ', runType])
mylog('minimal', ['[', timeNowUTC(), '] [Test] START Test: ', runType])
# Prepare test samples
sample_json = json.loads(
@@ -235,7 +233,7 @@ class plugin_manager:
"""
sql = self.db.sql
plugin_states = {}
now_str = timeNowDB()
now_str = timeNowUTC()
if plugin_name: # Only compute for single plugin
sql.execute(
@@ -799,7 +797,7 @@ def process_plugin_events(db, plugin, plugEventsArr):
if isMissing:
# if wasn't missing before, mark as changed
if tmpObj.status != "missing-in-last-scan":
tmpObj.changed = timeNowDB()
tmpObj.changed = timeNowUTC()
tmpObj.status = "missing-in-last-scan"
# mylog('debug', [f'[Plugins] Missing from last scan (PrimaryID | SecondaryID): {tmpObj.primaryId} | {tmpObj.secondaryId}'])

View File

@@ -3,7 +3,7 @@ import os
import re
import ipaddress
from helper import get_setting_value, check_IP_format
from utils.datetime_utils import timeNowDB, normalizeTimeStamp
from utils.datetime_utils import timeNowUTC, normalizeTimeStamp
from logger import mylog, Logger
from const import vendorsPath, vendorsPathNewest, sql_generateGuid, NULL_EQUIVALENTS
from models.device_instance import DeviceInstance
@@ -227,7 +227,7 @@ def update_devLastConnection_from_CurrentScan(db):
Update devLastConnection to current time for all devices seen in CurrentScan.
"""
sql = db.sql
startTime = timeNowDB()
startTime = timeNowUTC()
mylog("debug", f"[Update Devices] - Updating devLastConnection to {startTime}")
sql.execute(f"""
@@ -600,7 +600,7 @@ def print_scan_stats(db):
# -------------------------------------------------------------------------------
def create_new_devices(db):
sql = db.sql # TO-DO
startTime = timeNowDB()
startTime = timeNowUTC()
# Insert events for new devices from CurrentScan (not yet in Devices)
@@ -1109,7 +1109,7 @@ def update_devices_names(pm):
# --- Step 3: Log last checked time ---
# After resolving names, update last checked
pm.plugin_checks = {"DIGSCAN": timeNowDB(), "AVAHISCAN": timeNowDB(), "NSLOOKUP": timeNowDB(), "NBTSCAN": timeNowDB()}
pm.plugin_checks = {"DIGSCAN": timeNowUTC(), "AVAHISCAN": timeNowUTC(), "NSLOOKUP": timeNowUTC(), "NBTSCAN": timeNowUTC()}
# -------------------------------------------------------------------------------

View File

@@ -14,11 +14,11 @@ from scan.device_handling import (
)
from helper import get_setting_value
from db.db_helper import print_table_schema
from utils.datetime_utils import timeNowDB
from utils.datetime_utils import timeNowUTC
from logger import mylog, Logger
from messaging.reporting import skip_repeated_notifications
from messaging.in_app import update_unread_notifications_count
from const import NULL_EQUIVALENTS, NULL_EQUIVALENTS_SQL
from const import NULL_EQUIVALENTS_SQL
# Make sure log level is initialized correctly
@@ -167,7 +167,7 @@ def create_sessions_snapshot(db):
# -------------------------------------------------------------------------------
def insert_events(db):
sql = db.sql # TO-DO
startTime = timeNowDB()
startTime = timeNowUTC()
# Check device down
mylog("debug", "[Events] - 1 - Devices down")
@@ -234,7 +234,7 @@ def insert_events(db):
def insertOnlineHistory(db):
sql = db.sql # TO-DO: Implement sql object
scanTimestamp = timeNowDB()
scanTimestamp = timeNowUTC()
# Query to fetch all relevant device counts in one go
query = """

View File

@@ -3,7 +3,7 @@
import datetime
from logger import mylog
import conf
from utils.datetime_utils import timeNowUTC
# -------------------------------------------------------------------------------
@@ -28,11 +28,11 @@ class schedule_class:
# Initialize the last run time if never run before
if self.last_run == 0:
self.last_run = (
datetime.datetime.now(conf.tz) - datetime.timedelta(days=365)
timeNowUTC(as_string=False) - datetime.timedelta(days=365)
).replace(microsecond=0)
# get the current time with the currently specified timezone
nowTime = datetime.datetime.now(conf.tz).replace(microsecond=0)
nowTime = timeNowUTC(as_string=False)
# Run the schedule if the current time is past the schedule time we saved last time and
# (maybe the following check is unnecessary)

View File

@@ -20,47 +20,40 @@ DATETIME_PATTERN = "%Y-%m-%d %H:%M:%S"
DATETIME_REGEX = re.compile(r'^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}$')
def timeNowTZ():
if conf.tz:
return datetime.datetime.now(conf.tz).replace(microsecond=0)
else:
return datetime.datetime.now().replace(microsecond=0)
def timeNow():
return datetime.datetime.now().replace(microsecond=0)
# ⚠️ CRITICAL: ALL database timestamps MUST be stored in UTC
# This is the SINGLE SOURCE OF TRUTH for current time in NetAlertX
# Use timeNowUTC() for DB writes (returns UTC string by default)
# Use timeNowUTC(as_string=False) for datetime operations (scheduling, comparisons, logging)
def timeNowUTC(as_string=True):
"""
Return the current time in UTC.
This is the ONLY function that calls datetime.datetime.now() in the entire codebase.
All timestamps stored in the database MUST use UTC format.
Args:
as_string (bool): If True, returns formatted string for DB storage.
If False, returns datetime object for operations.
Returns:
str: UTC timestamp as 'YYYY-MM-DD HH:MM:SS' when as_string=True
datetime.datetime: UTC datetime object when as_string=False
Examples:
timeNowUTC() → '2025-11-04 07:09:11' (for DB writes)
timeNowUTC(as_string=False) → datetime.datetime(2025, 11, 4, 7, 9, 11, tzinfo=UTC)
"""
utc_now = datetime.datetime.now(datetime.UTC).replace(microsecond=0)
return utc_now.strftime(DATETIME_PATTERN) if as_string else utc_now
def get_timezone_offset():
now = datetime.datetime.now(conf.tz)
offset_hours = now.utcoffset().total_seconds() / 3600
now = timeNowUTC(as_string=False).replace(tzinfo=conf.tz) if conf.tz else timeNowUTC(as_string=False)
offset_hours = now.utcoffset().total_seconds() / 3600 if now.utcoffset() else 0
offset_formatted = "{:+03d}:{:02d}".format(int(offset_hours), int((offset_hours % 1) * 60))
return offset_formatted
def timeNowDB(local=True):
"""
Return the current time (local or UTC) as ISO 8601 for DB storage.
Safe for SQLite, PostgreSQL, etc.
Example local: '2025-11-04 18:09:11'
Example UTC: '2025-11-04 07:09:11'
"""
if local:
try:
if isinstance(conf.tz, datetime.tzinfo):
tz = conf.tz
elif conf.tz:
tz = ZoneInfo(conf.tz)
else:
tz = None
except Exception:
tz = None
return datetime.datetime.now(tz).strftime(DATETIME_PATTERN)
else:
return datetime.datetime.now(datetime.UTC).strftime(DATETIME_PATTERN)
# -------------------------------------------------------------------------------
# Date and time methods
# -------------------------------------------------------------------------------
@@ -113,7 +106,10 @@ def normalizeTimeStamp(inputTimeStamp):
# -------------------------------------------------------------------------------------------
def format_date_iso(date_val: str) -> Optional[str]:
"""Ensures a date string from DB is returned as a proper ISO string with TZ."""
"""Ensures a date string from DB is returned as a proper ISO string with TZ.
Assumes DB timestamps are stored in UTC and converts them to user's configured timezone.
"""
if not date_val:
return None
@@ -125,10 +121,14 @@ def format_date_iso(date_val: str) -> Optional[str]:
else:
dt = date_val
# 2. If it has no timezone, ATTACH (don't convert) your config TZ
# 2. If it has no timezone, assume it's UTC (our DB storage format)
# then CONVERT to user's configured timezone
if dt.tzinfo is None:
# Mark as UTC first
dt = dt.replace(tzinfo=datetime.UTC)
# Convert to user's timezone
target_tz = conf.tz if isinstance(conf.tz, datetime.tzinfo) else ZoneInfo(conf.tz)
dt = dt.replace(tzinfo=target_tz)
dt = dt.astimezone(target_tz)
# 3. Return the string. .isoformat() will now include the +11:00 or +10:00
return dt.isoformat()
@@ -151,7 +151,7 @@ def format_event_date(date_str: str, event_type: str) -> str:
# -------------------------------------------------------------------------------------------
def ensure_datetime(dt: Union[str, datetime.datetime, None]) -> datetime.datetime:
if dt is None:
return timeNowTZ()
return timeNowUTC(as_string=False)
if isinstance(dt, str):
return datetime.datetime.fromisoformat(dt)
return dt
@@ -172,6 +172,10 @@ def parse_datetime(dt_str):
def format_date(date_str: str) -> str:
"""Format a date string from DB for display.
Assumes DB timestamps are stored in UTC and converts them to user's configured timezone.
"""
try:
if not date_str:
return ""
@@ -179,25 +183,21 @@ def format_date(date_str: str) -> str:
date_str = re.sub(r"\s+", " ", str(date_str).strip())
dt = parse_datetime(date_str)
if dt.tzinfo is None:
if isinstance(conf.tz, str):
dt = dt.replace(tzinfo=ZoneInfo(conf.tz))
else:
dt = dt.replace(tzinfo=conf.tz)
if not dt:
return f"invalid:{repr(date_str)}"
# If the DB has no timezone, we tell Python what it IS,
# we don't CONVERT it.
# If the DB timestamp has no timezone, assume it's UTC (our storage format)
# then CONVERT to user's configured timezone
if dt.tzinfo is None:
# Option A: If the DB time is already AEDT, use AEDT.
# Option B: Use conf.tz if that is your 'source of truth'
dt = dt.replace(tzinfo=conf.tz)
# Mark as UTC first
dt = dt.replace(tzinfo=datetime.UTC)
# Convert to user's timezone
if isinstance(conf.tz, str):
dt = dt.astimezone(ZoneInfo(conf.tz))
else:
dt = dt.astimezone(conf.tz)
# IMPORTANT: Return the ISO format of the object AS IS.
# Calling .astimezone() here triggers a conversion to the
# System Local Time , which is causing your shift.
# Return ISO format with timezone offset
return dt.isoformat()
except Exception as e:
@@ -207,7 +207,7 @@ def format_date(date_str: str) -> str:
def format_date_diff(date1, date2, tz_name):
"""
Return difference between two datetimes as 'Xd HH:MM'.
Uses app timezone if datetime is naive.
Assumes DB timestamps are stored in UTC and converts them to user's configured timezone.
date2 can be None (uses now).
"""
# Get timezone from settings
@@ -215,20 +215,22 @@ def format_date_diff(date1, date2, tz_name):
def parse_dt(dt):
if dt is None:
return datetime.datetime.now(tz)
# Get current UTC time and convert to user's timezone
return timeNowUTC(as_string=False).astimezone(tz)
if isinstance(dt, str):
try:
dt_parsed = email.utils.parsedate_to_datetime(dt)
except (ValueError, TypeError):
# fallback: parse ISO string
dt_parsed = datetime.datetime.fromisoformat(dt)
# convert naive GMT/UTC to app timezone
# If naive (no timezone), assume it's UTC from DB, then convert to user's timezone
if dt_parsed.tzinfo is None:
dt_parsed = tz.localize(dt_parsed)
dt_parsed = dt_parsed.replace(tzinfo=datetime.UTC).astimezone(tz)
else:
dt_parsed = dt_parsed.astimezone(tz)
return dt_parsed
return dt if dt.tzinfo else tz.localize(dt)
# If datetime object without timezone, assume it's UTC from DB
return dt.astimezone(tz) if dt.tzinfo else dt.replace(tzinfo=datetime.UTC).astimezone(tz)
dt1 = parse_dt(date1)
dt2 = parse_dt(date2)

View File

@@ -8,7 +8,7 @@ INSTALL_PATH = os.getenv('NETALERTX_APP', '/app')
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
from helper import get_setting_value # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowDB # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowUTC # noqa: E402 [flake8 lint suppression]
from api_server.api_server_start import app # noqa: E402 [flake8 lint suppression]
@@ -43,7 +43,7 @@ def b64(sql: str) -> str:
# -----------------------------
def test_dbquery_create_device(client, api_token, test_mac):
now = timeNowDB()
now = timeNowUTC()
sql = f"""
INSERT INTO Devices (devMac, devName, devVendor, devOwner, devFirstConnection, devLastConnection, devLastIP)

View File

@@ -8,7 +8,7 @@ INSTALL_PATH = os.getenv('NETALERTX_APP', '/app')
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
from helper import get_setting_value # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowTZ # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowUTC # noqa: E402 [flake8 lint suppression]
from api_server.api_server_start import app # noqa: E402 [flake8 lint suppression]
@@ -38,7 +38,7 @@ def create_event(client, api_token, mac, event="UnitTest Event", days_old=None):
# Calculate the event_time if days_old is given
if days_old is not None:
event_time = timeNowTZ() - timedelta(days=days_old)
event_time = timeNowUTC(as_string=False) - timedelta(days=days_old)
# ISO 8601 string
payload["event_time"] = event_time.isoformat()
@@ -140,7 +140,7 @@ def test_delete_events_dynamic_days(client, api_token, test_mac):
# Count pre-existing events younger than 30 days for test_mac
# These will remain after delete operation
from datetime import datetime
thirty_days_ago = timeNowTZ() - timedelta(days=30)
thirty_days_ago = timeNowUTC(as_string=False) - timedelta(days=30)
initial_younger_count = 0
for ev in initial_events:
if ev.get("eve_MAC") == test_mac and ev.get("eve_DateTime"):

View File

@@ -8,7 +8,7 @@ INSTALL_PATH = os.getenv('NETALERTX_APP', '/app')
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
from helper import get_setting_value # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowTZ, timeNowDB # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowUTC # noqa: E402 [flake8 lint suppression]
from api_server.api_server_start import app # noqa: E402 [flake8 lint suppression]
@@ -50,7 +50,7 @@ def test_create_session(client, api_token, test_mac):
payload = {
"mac": test_mac,
"ip": "192.168.1.100",
"start_time": timeNowDB(),
"start_time": timeNowUTC(),
"event_type_conn": "Connected",
"event_type_disc": "Disconnected"
}
@@ -65,7 +65,7 @@ def test_list_sessions(client, api_token, test_mac):
payload = {
"mac": test_mac,
"ip": "192.168.1.100",
"start_time": timeNowDB()
"start_time": timeNowUTC()
}
client.post("/sessions/create", json=payload, headers=auth_headers(api_token))
@@ -82,7 +82,7 @@ def test_device_sessions_by_period(client, api_token, test_mac):
payload = {
"mac": test_mac,
"ip": "192.168.1.200",
"start_time": timeNowDB()
"start_time": timeNowUTC()
}
resp_create = client.post("/sessions/create", json=payload, headers=auth_headers(api_token))
assert resp_create.status_code == 200
@@ -117,7 +117,7 @@ def test_device_session_events(client, api_token, test_mac):
payload = {
"mac": test_mac,
"ip": "192.168.1.250",
"start_time": timeNowDB()
"start_time": timeNowUTC()
}
resp_create = client.post(
"/sessions/create",
@@ -166,7 +166,7 @@ def test_delete_session(client, api_token, test_mac):
payload = {
"mac": test_mac,
"ip": "192.168.1.100",
"start_time": timeNowDB()
"start_time": timeNowUTC()
}
client.post("/sessions/create", json=payload, headers=auth_headers(api_token))
@@ -188,7 +188,7 @@ def test_get_sessions_calendar(client, api_token, test_mac):
Cleans up test sessions after test.
"""
# --- Setup: create two sessions for the test MAC ---
now = timeNowTZ()
now = datetime.now()
start1 = (now - timedelta(days=2)).isoformat(timespec="seconds")
end1 = (now - timedelta(days=1, hours=20)).isoformat(timespec="seconds")

View File

@@ -0,0 +1,238 @@
"""
Unit tests for database timestamp migration to UTC.
Tests verify that:
- Migration detects version correctly from Settings table
- Fresh installs skip migration (empty VERSION)
- Upgrades from v26.2.6+ skip migration (already UTC)
- Upgrades from <v26.2.6 run migration (convert local→UTC)
- Migration handles timezone offset calculations correctly
- Migration is idempotent (safe to run multiple times)
"""
import sys
import os
import pytest
import sqlite3
import tempfile
INSTALL_PATH = os.getenv('NETALERTX_APP', '/app')
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
from db.db_upgrade import migrate_timestamps_to_utc, is_timestamps_in_utc # noqa: E402
from utils.datetime_utils import timeNowUTC # noqa: E402
@pytest.fixture
def temp_db():
"""Create a temporary database for testing"""
fd, db_path = tempfile.mkstemp(suffix='.db')
os.close(fd)
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Create Settings table
cursor.execute("""
CREATE TABLE Settings (
setKey TEXT PRIMARY KEY,
setValue TEXT
)
""")
# Create Devices table with timestamp columns
cursor.execute("""
CREATE TABLE Devices (
devMac TEXT PRIMARY KEY,
devFirstConnection TEXT,
devLastConnection TEXT,
devLastNotification TEXT
)
""")
conn.commit()
yield cursor, conn
conn.close()
os.unlink(db_path)
class TestTimestampMigration:
"""Test suite for UTC timestamp migration"""
def test_migrate_fresh_install_skips_migration(self, temp_db):
"""Test that fresh install with empty VERSION skips migration"""
cursor, conn = temp_db
# Empty Settings table (fresh install)
result = migrate_timestamps_to_utc(cursor)
assert result is True
# Should return without error
def test_migrate_unknown_version_skips_migration(self, temp_db):
"""Test that 'unknown' VERSION skips migration"""
cursor, conn = temp_db
# Insert 'unknown' VERSION
cursor.execute("INSERT INTO Settings (setKey, setValue) VALUES ('VERSION', 'unknown')")
conn.commit()
result = migrate_timestamps_to_utc(cursor)
assert result is True
def test_migrate_version_26_2_6_skips_migration(self, temp_db):
"""Test that v26.2.6 skips migration (already UTC)"""
cursor, conn = temp_db
# Insert VERSION v26.2.6
cursor.execute("INSERT INTO Settings (setKey, setValue) VALUES ('VERSION', '26.2.6')")
conn.commit()
result = migrate_timestamps_to_utc(cursor)
assert result is True
def test_migrate_version_27_0_0_skips_migration(self, temp_db):
"""Test that v27.0.0 skips migration (newer version)"""
cursor, conn = temp_db
# Insert VERSION v27.0.0
cursor.execute("INSERT INTO Settings (setKey, setValue) VALUES ('VERSION', '27.0.0')")
conn.commit()
result = migrate_timestamps_to_utc(cursor)
assert result is True
def test_migrate_version_26_3_0_skips_migration(self, temp_db):
"""Test that v26.3.0 skips migration (newer minor version)"""
cursor, conn = temp_db
# Insert VERSION v26.3.0
cursor.execute("INSERT INTO Settings (setKey, setValue) VALUES ('VERSION', '26.3.0')")
conn.commit()
result = migrate_timestamps_to_utc(cursor)
assert result is True
def test_migrate_old_version_triggers_migration(self, temp_db):
"""Test that v25.x.x triggers migration"""
cursor, conn = temp_db
# Insert VERSION v25.1.0
cursor.execute("INSERT INTO Settings (setKey, setValue) VALUES ('VERSION', '25.1.0')")
# Insert a sample device with timestamp
now_str = timeNowUTC()
cursor.execute("""
INSERT INTO Devices (devMac, devFirstConnection, devLastConnection)
VALUES ('aa:bb:cc:dd:ee:ff', ?, ?)
""", (now_str, now_str))
conn.commit()
result = migrate_timestamps_to_utc(cursor)
assert result is True
def test_migrate_version_with_v_prefix(self, temp_db):
"""Test that version string with 'v' prefix is parsed correctly"""
cursor, conn = temp_db
# Insert VERSION with 'v' prefix
cursor.execute("INSERT INTO Settings (setKey, setValue) VALUES ('VERSION', 'v26.2.6')")
conn.commit()
result = migrate_timestamps_to_utc(cursor)
assert result is True
def test_migrate_malformed_version_uses_fallback(self, temp_db):
"""Test that malformed version string uses timestamp detection fallback"""
cursor, conn = temp_db
# Insert malformed VERSION
cursor.execute("INSERT INTO Settings (setKey, setValue) VALUES ('VERSION', 'invalid.version')")
conn.commit()
result = migrate_timestamps_to_utc(cursor)
# Should not crash, should use fallback detection
assert result is True
def test_migrate_version_26_2_5_triggers_migration(self, temp_db):
"""Test that v26.2.5 (one patch before UTC) triggers migration"""
cursor, conn = temp_db
# Insert VERSION v26.2.5
cursor.execute("INSERT INTO Settings (setKey, setValue) VALUES ('VERSION', '26.2.5')")
# Insert sample device
now_str = timeNowUTC()
cursor.execute("""
INSERT INTO Devices (devMac, devFirstConnection)
VALUES ('aa:bb:cc:dd:ee:ff', ?)
""", (now_str,))
conn.commit()
result = migrate_timestamps_to_utc(cursor)
assert result is True
def test_migrate_does_not_crash_on_empty_devices_table(self, temp_db):
"""Test that migration handles empty Devices table gracefully"""
cursor, conn = temp_db
# Insert old VERSION but no devices
cursor.execute("INSERT INTO Settings (setKey, setValue) VALUES ('VERSION', '25.1.0')")
conn.commit()
result = migrate_timestamps_to_utc(cursor)
assert result is True
def test_is_timestamps_in_utc_returns_true_for_empty_table(self, temp_db):
"""Test that is_timestamps_in_utc returns True for empty Devices table"""
cursor, conn = temp_db
result = is_timestamps_in_utc(cursor)
assert result is True
def test_is_timestamps_in_utc_detects_utc_timestamps(self, temp_db):
"""Test that is_timestamps_in_utc correctly identifies UTC timestamps"""
cursor, conn = temp_db
# Insert devices with UTC timestamps
now_str = timeNowUTC()
cursor.execute("""
INSERT INTO Devices (devMac, devFirstConnection)
VALUES ('aa:bb:cc:dd:ee:ff', ?)
""", (now_str,))
conn.commit()
result = is_timestamps_in_utc(cursor)
# Should return False for naive timestamps (no timezone marker)
# This is expected behavior - naive timestamps need migration check
assert result is False
def test_is_timestamps_in_utc_detects_timezone_markers(self, temp_db):
"""Test that is_timestamps_in_utc detects timestamps with timezone info"""
cursor, conn = temp_db
# Insert device with timezone marker
timestamp_with_tz = "2026-02-11 11:37:02+00:00"
cursor.execute("""
INSERT INTO Devices (devMac, devFirstConnection)
VALUES ('aa:bb:cc:dd:ee:ff', ?)
""", (timestamp_with_tz,))
conn.commit()
result = is_timestamps_in_utc(cursor)
# Should detect timezone marker
assert result is True

View File

@@ -0,0 +1,106 @@
"""
Unit tests for datetime_utils.py UTC timestamp functions.
Tests verify that:
- timeNowUTC() returns correct formats (string and datetime object)
- All timestamps are in UTC timezone
- No other functions call datetime.datetime.now() (single source of truth)
"""
import sys
import os
import datetime
import pytest
INSTALL_PATH = os.getenv('NETALERTX_APP', '/app')
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
from utils.datetime_utils import timeNowUTC, DATETIME_PATTERN # noqa: E402
class TestTimeNowUTC:
"""Test suite for timeNowUTC() function"""
def test_timeNowUTC_returns_string_by_default(self):
"""Test that timeNowUTC() returns a string by default"""
result = timeNowUTC()
assert isinstance(result, str)
assert len(result) == 19 # 'YYYY-MM-DD HH:MM:SS' format
def test_timeNowUTC_string_format(self):
"""Test that timeNowUTC() returns correct string format"""
result = timeNowUTC()
# Verify format matches DATETIME_PATTERN
try:
datetime.datetime.strptime(result, DATETIME_PATTERN)
except ValueError:
pytest.fail(f"timeNowUTC() returned invalid format: {result}")
def test_timeNowUTC_returns_datetime_object_when_false(self):
"""Test that timeNowUTC(as_string=False) returns datetime object"""
result = timeNowUTC(as_string=False)
assert isinstance(result, datetime.datetime)
def test_timeNowUTC_datetime_has_UTC_timezone(self):
"""Test that datetime object has UTC timezone"""
result = timeNowUTC(as_string=False)
assert result.tzinfo is datetime.UTC or result.tzinfo is not None
def test_timeNowUTC_datetime_no_microseconds(self):
"""Test that datetime object has microseconds set to 0"""
result = timeNowUTC(as_string=False)
assert result.microsecond == 0
def test_timeNowUTC_consistency_between_modes(self):
"""Test that string and datetime modes return consistent values"""
dt_obj = timeNowUTC(as_string=False)
str_result = timeNowUTC(as_string=True)
# Convert datetime to string and compare (within 1 second tolerance)
dt_str = dt_obj.strftime(DATETIME_PATTERN)
# Parse both to compare timestamps
t1 = datetime.datetime.strptime(dt_str, DATETIME_PATTERN)
t2 = datetime.datetime.strptime(str_result, DATETIME_PATTERN)
diff = abs((t1 - t2).total_seconds())
assert diff <= 1 # Allow 1 second difference
def test_timeNowUTC_is_actually_UTC(self):
"""Test that timeNowUTC() returns actual UTC time, not local time"""
utc_now = datetime.datetime.now(datetime.UTC).replace(microsecond=0)
result = timeNowUTC(as_string=False)
# Should be within 1 second
diff = abs((utc_now - result).total_seconds())
assert diff <= 1
def test_timeNowUTC_string_matches_datetime_conversion(self):
"""Test that string result matches datetime object conversion"""
dt_obj = timeNowUTC(as_string=False)
str_result = timeNowUTC(as_string=True)
# Convert datetime to string using same format
expected = dt_obj.strftime(DATETIME_PATTERN)
# Should be same or within 1 second
t1 = datetime.datetime.strptime(expected, DATETIME_PATTERN)
t2 = datetime.datetime.strptime(str_result, DATETIME_PATTERN)
diff = abs((t1 - t2).total_seconds())
assert diff <= 1
def test_timeNowUTC_explicit_true_parameter(self):
"""Test that timeNowUTC(as_string=True) explicitly returns string"""
result = timeNowUTC(as_string=True)
assert isinstance(result, str)
def test_timeNowUTC_multiple_calls_increase(self):
"""Test that subsequent calls return increasing timestamps"""
import time
t1_str = timeNowUTC()
time.sleep(0.1)
t2_str = timeNowUTC()
t1 = datetime.datetime.strptime(t1_str, DATETIME_PATTERN)
t2 = datetime.datetime.strptime(t2_str, DATETIME_PATTERN)
assert t2 >= t1