mirror of
https://github.com/jokob-sk/NetAlertX.git
synced 2025-12-07 09:36:05 -08:00
Compare commits
6 Commits
ea8cea16c5
...
1fd8d97d56
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1fd8d97d56 | ||
|
|
286d5555d2 | ||
|
|
57096a9258 | ||
|
|
c08eb1dbba | ||
|
|
746f1a8922 | ||
|
|
0845b7f445 |
2
.github/copilot-instructions.md
vendored
2
.github/copilot-instructions.md
vendored
@@ -42,7 +42,7 @@ Backend loop phases (see `server/__main__.py` and `server/plugin.py`): `once`, `
|
||||
## Conventions & helpers to reuse
|
||||
- Settings: add/modify via `ccd()` in `server/initialise.py` or per‑plugin manifest. Never hardcode ports or secrets; use `get_setting_value()`.
|
||||
- Logging: use `logger.mylog(level, [message])`; levels: none/minimal/verbose/debug/trace.
|
||||
- Time/MAC/strings: `helper.py` (`timeNowTZ`, `normalize_mac`, sanitizers). Validate MACs before DB writes.
|
||||
- Time/MAC/strings: `helper.py` (`timeNowDB`, `normalize_mac`, sanitizers). Validate MACs before DB writes.
|
||||
- DB helpers: prefer `server/db/db_helper.py` functions (e.g., `get_table_json`, device condition helpers) over raw SQL in new paths.
|
||||
|
||||
## Dev workflow (devcontainer)
|
||||
|
||||
@@ -47,11 +47,18 @@ In Notification Processing settings, you can specify blanket rules. These allow
|
||||
3. A filter to allow you to set device-specific exceptions to New devices being added to the app.
|
||||
4. A filter to allow you to set device-specific exceptions to generated Events.
|
||||
|
||||
## Ignoring devices 🔕
|
||||
## Ignoring devices 💻
|
||||
|
||||

|
||||
|
||||
You can completely ignore detected devices globally. This could be because your instance detects docker containers, you want to ignore devices from a specific manufacturer via MAC rules or you want to ignore devices on a specific IP range.
|
||||
|
||||
1. Ignored MACs (`NEWDEV_ignored_MACs`) - List of MACs to ignore.
|
||||
2. Ignored IPs (`NEWDEV_ignored_IPs`) - List of IPs to ignore.
|
||||
2. Ignored IPs (`NEWDEV_ignored_IPs`) - List of IPs to ignore.
|
||||
|
||||
## Ignoring notifications 🔕
|
||||
|
||||
You can filter out unwanted notifications globally. This could be because of a misbehaving device (GoogleNest/GoogleHub (See also [ARPSAN docs and the `--exclude-broadcast` flag](https://github.com/jokob-sk/NetAlertX/tree/main/front/plugins/arp_scan#ip-flipping-on-google-nest-devices))) which flips between IP addresses, or because you want to ignore new device notifications of a certain pattern.
|
||||
|
||||
1. Events Filter (`NTFPRCS_event_condition`) - filter out Events from notifications.
|
||||
2. New Devices Filter (`NTFPRCS_new_dev_condition`) - filter out New Devices from notifications, but log and keep a new device in the system.
|
||||
@@ -13,16 +13,22 @@ function renderLogArea($params) {
|
||||
$textAreaCssClass = isset($params['textAreaCssClass']) ? $params['textAreaCssClass'] : '';
|
||||
$buttons = isset($params['buttons']) ? $params['buttons'] : [];
|
||||
$content = "";
|
||||
$fileSize = 0;
|
||||
|
||||
if (filesize($filePath) > 2000000) {
|
||||
$content = file_get_contents($filePath, false, null, -2000000);
|
||||
if (file_exists($filePath) && is_readable($filePath)) {
|
||||
$fileSize = filesize($filePath);
|
||||
if ($fileSize > 2000000) {
|
||||
$content = file_get_contents($filePath, false, null, max(0, $fileSize - 2000000));
|
||||
} else {
|
||||
$content = file_get_contents($filePath);
|
||||
}
|
||||
} else {
|
||||
$content = file_get_contents($filePath);
|
||||
$content = "⚠️ File not found or not readable: $filePath";
|
||||
}
|
||||
|
||||
// Prepare the download button HTML if filePath starts with /app
|
||||
$downloadButtonHtml = '';
|
||||
if (strpos($filePath, '/app') === 0) {
|
||||
if (strpos($filePath, '/app') === 0 && file_exists($filePath)) {
|
||||
$downloadButtonHtml = '
|
||||
<span class="span-padding">
|
||||
<a href="' . htmlspecialchars(str_replace('/app/log/', '/php/server/query_logs.php?file=', $filePath)) . '" target="_blank">
|
||||
@@ -34,13 +40,7 @@ function renderLogArea($params) {
|
||||
// Prepare buttons HTML
|
||||
$buttonsHtml = '';
|
||||
$totalButtons = count($buttons);
|
||||
if ($totalButtons > 0) {
|
||||
$colClass = 12 / $totalButtons;
|
||||
// Use $colClass in your HTML generation or further logic
|
||||
} else {
|
||||
// Handle case where $buttons array is empty
|
||||
$colClass = 12;
|
||||
}
|
||||
$colClass = $totalButtons > 0 ? (12 / $totalButtons) : 12;
|
||||
|
||||
foreach ($buttons as $button) {
|
||||
$labelStringCode = isset($button['labelStringCode']) ? $button['labelStringCode'] : '';
|
||||
@@ -52,8 +52,7 @@ function renderLogArea($params) {
|
||||
</div>';
|
||||
}
|
||||
|
||||
|
||||
// Render the log area HTML
|
||||
// Render HTML
|
||||
$html = '
|
||||
<div class="log-area box box-solid box-primary">
|
||||
<div class="row logs-row col-sm-12 col-xs-12">
|
||||
@@ -63,7 +62,7 @@ function renderLogArea($params) {
|
||||
</div>
|
||||
<div class="row logs-row">
|
||||
<div class="log-file col-sm-6 col-xs-12">' . htmlspecialchars($filePath) . '
|
||||
<div class="logs-size">' . number_format((filesize($filePath) / 1000000), 2, ",", ".") . ' MB'
|
||||
<div class="logs-size">' . number_format(($fileSize / 1000000), 2, ",", ".") . ' MB'
|
||||
. $downloadButtonHtml .
|
||||
'</div>
|
||||
</div>
|
||||
|
||||
@@ -16,7 +16,8 @@ import conf
|
||||
from const import confFileName, logPath
|
||||
from plugin_helper import Plugin_Objects
|
||||
from logger import mylog, Logger, append_line_to_file
|
||||
from helper import timeNowDB, get_setting_value
|
||||
from helper import get_setting_value
|
||||
from utils.datetime_utils import timeNowDB
|
||||
from models.notification_instance import NotificationInstance
|
||||
from database import DB
|
||||
from pytz import timezone
|
||||
|
||||
@@ -25,7 +25,8 @@ import conf
|
||||
from const import confFileName, logPath
|
||||
from plugin_helper import Plugin_Objects
|
||||
from logger import mylog, Logger, append_line_to_file
|
||||
from helper import timeNowDB, get_setting_value, hide_email
|
||||
from helper import get_setting_value, hide_email
|
||||
from utils.datetime_utils import timeNowDB
|
||||
from models.notification_instance import NotificationInstance
|
||||
from database import DB
|
||||
from pytz import timezone
|
||||
|
||||
@@ -23,8 +23,9 @@ from const import confFileName, logPath
|
||||
from plugin_utils import getPluginObject
|
||||
from plugin_helper import Plugin_Objects
|
||||
from logger import mylog, Logger
|
||||
from helper import timeNowDB, get_setting_value, bytes_to_string, \
|
||||
from helper import get_setting_value, bytes_to_string, \
|
||||
sanitize_string, normalize_string
|
||||
from utils.datetime_utils import timeNowDB
|
||||
from database import DB, get_device_stats
|
||||
|
||||
|
||||
|
||||
@@ -19,7 +19,8 @@ import conf
|
||||
from const import confFileName, logPath
|
||||
from plugin_helper import Plugin_Objects, handleEmpty
|
||||
from logger import mylog, Logger, append_line_to_file
|
||||
from helper import timeNowDB, get_setting_value
|
||||
from helper import get_setting_value
|
||||
from utils.datetime_utils import timeNowDB
|
||||
from models.notification_instance import NotificationInstance
|
||||
from database import DB
|
||||
from pytz import timezone
|
||||
|
||||
@@ -11,7 +11,8 @@ sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
|
||||
|
||||
from plugin_helper import Plugin_Objects, handleEmpty # noqa: E402
|
||||
from logger import mylog, Logger # noqa: E402
|
||||
from helper import timeNowDB, get_setting_value, hide_string # noqa: E402
|
||||
from helper import get_setting_value, hide_string # noqa: E402
|
||||
from utils.datetime_utils import timeNowDB
|
||||
from models.notification_instance import NotificationInstance # noqa: E402
|
||||
from database import DB # noqa: E402
|
||||
import conf
|
||||
|
||||
@@ -19,7 +19,8 @@ import conf
|
||||
from const import confFileName, logPath
|
||||
from plugin_helper import Plugin_Objects, handleEmpty
|
||||
from logger import mylog, Logger, append_line_to_file
|
||||
from helper import timeNowDB, get_setting_value, hide_string
|
||||
from helper import get_setting_value, hide_string
|
||||
from utils.datetime_utils import timeNowDB
|
||||
from models.notification_instance import NotificationInstance
|
||||
from database import DB
|
||||
from pytz import timezone
|
||||
|
||||
@@ -16,7 +16,8 @@ import conf
|
||||
from const import confFileName, logPath
|
||||
from plugin_helper import Plugin_Objects
|
||||
from logger import mylog, Logger, append_line_to_file
|
||||
from helper import timeNowDB, get_setting_value
|
||||
from helper import get_setting_value
|
||||
from utils.datetime_utils import timeNowDB
|
||||
from models.notification_instance import NotificationInstance
|
||||
from database import DB
|
||||
from pytz import timezone
|
||||
|
||||
@@ -22,7 +22,8 @@ import conf
|
||||
from const import logPath, confFileName
|
||||
from plugin_helper import Plugin_Objects, handleEmpty
|
||||
from logger import mylog, Logger, append_line_to_file
|
||||
from helper import timeNowDB, get_setting_value, hide_string, write_file
|
||||
from helper import get_setting_value, hide_string, write_file
|
||||
from utils.datetime_utils import timeNowDB
|
||||
from models.notification_instance import NotificationInstance
|
||||
from database import DB
|
||||
from pytz import timezone
|
||||
|
||||
@@ -20,8 +20,9 @@ sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
|
||||
|
||||
from plugin_helper import Plugin_Object, Plugin_Objects, decodeBase64
|
||||
from logger import mylog, Logger, append_line_to_file
|
||||
from helper import timeNowDB, check_IP_format, get_setting_value
|
||||
from helper import check_IP_format, get_setting_value
|
||||
from const import logPath, applicationPath, fullDbPath
|
||||
from utils.datetime_utils import timeNowDB
|
||||
import conf
|
||||
from pytz import timezone
|
||||
|
||||
|
||||
@@ -13,7 +13,8 @@ sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
|
||||
|
||||
from plugin_helper import Plugin_Objects
|
||||
from logger import mylog, Logger, append_line_to_file
|
||||
from helper import timeNowDB, get_setting_value
|
||||
from helper import get_setting_value
|
||||
from utils.datetime_utils import timeNowDB
|
||||
import conf
|
||||
from pytz import timezone
|
||||
from const import logPath
|
||||
|
||||
@@ -14,7 +14,8 @@ sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
|
||||
|
||||
from plugin_helper import Plugin_Object, Plugin_Objects, decodeBase64
|
||||
from logger import mylog, Logger, append_line_to_file
|
||||
from helper import timeNowDB, get_setting_value
|
||||
from helper import get_setting_value
|
||||
from utils.datetime_utils import timeNowDB
|
||||
from const import logPath, applicationPath
|
||||
import conf
|
||||
from pytz import timezone
|
||||
|
||||
@@ -149,7 +149,7 @@
|
||||
"description": [
|
||||
{
|
||||
"language_code": "en_us",
|
||||
"string": "You can specify a SQL where condition to filter out Events from notifications. For example <code>AND devLastIP NOT LIKE '192.168.3.%'</code> will always exclude New Device notifications for all devices with the IP starting with <code>192.168.3.%</code>."
|
||||
"string": "You can specify a SQL where condition to filter out Events from notifications. For example <code>AND devLastIP NOT LIKE '192.168.3.%'</code> will always exclude any Event notifications for all devices with the IP starting with <code>192.168.3.%</code>."
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
@@ -11,7 +11,8 @@ INSTALL_PATH = "/app"
|
||||
sys.path.append(f"{INSTALL_PATH}/front/plugins")
|
||||
sys.path.append(f'{INSTALL_PATH}/server')
|
||||
|
||||
from logger import mylog, Logger, timeNowDB
|
||||
from logger import mylog, Logger
|
||||
from utils.datetime_utils import timeNowDB
|
||||
from const import confFileName, default_tz
|
||||
|
||||
#-------------------------------------------------------------------------------
|
||||
|
||||
@@ -18,7 +18,8 @@ from plugin_helper import Plugin_Object, Plugin_Objects, decodeBase64
|
||||
from plugin_utils import get_plugins_configs, decode_and_rename_files
|
||||
from logger import mylog, Logger
|
||||
from const import pluginsPath, fullDbPath, logPath
|
||||
from helper import timeNowDB, get_setting_value
|
||||
from helper import get_setting_value
|
||||
from utils.datetime_utils import timeNowDB
|
||||
from crypto_utils import encrypt_data
|
||||
from messaging.in_app import write_notification
|
||||
import conf
|
||||
|
||||
@@ -26,7 +26,8 @@ from pathlib import Path
|
||||
import conf
|
||||
from const import *
|
||||
from logger import mylog
|
||||
from helper import filePermissions, timeNowTZ, get_setting_value
|
||||
from helper import filePermissions, get_setting_value
|
||||
from utils.datetime_utils import timeNowTZ
|
||||
from app_state import updateState
|
||||
from api import update_api
|
||||
from scan.session_events import process_scan
|
||||
|
||||
@@ -7,7 +7,8 @@ import datetime
|
||||
import conf
|
||||
from const import (apiPath, sql_appevents, sql_devices_all, sql_events_pending_alert, sql_settings, sql_plugins_events, sql_plugins_history, sql_plugins_objects,sql_language_strings, sql_notifications_all, sql_online_history, sql_devices_tiles, sql_devices_filters)
|
||||
from logger import mylog
|
||||
from helper import write_file, get_setting_value, timeNowTZ
|
||||
from helper import write_file, get_setting_value
|
||||
from utils.datetime_utils import timeNowTZ
|
||||
from app_state import updateState
|
||||
from models.user_events_queue_instance import UserEventsQueueInstance
|
||||
from messaging.in_app import write_notification
|
||||
|
||||
@@ -14,7 +14,8 @@ INSTALL_PATH="/app"
|
||||
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
|
||||
|
||||
from database import get_temp_db_connection
|
||||
from helper import is_random_mac, format_date, get_setting_value, timeNowDB
|
||||
from helper import is_random_mac, get_setting_value
|
||||
from utils.datetime_utils import timeNowDB, format_date
|
||||
from db.db_helper import row_to_json, get_date_from_period
|
||||
|
||||
# --------------------------
|
||||
|
||||
@@ -19,8 +19,9 @@ INSTALL_PATH="/app"
|
||||
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
|
||||
|
||||
from database import get_temp_db_connection
|
||||
from helper import is_random_mac, format_date, get_setting_value
|
||||
from helper import is_random_mac, get_setting_value
|
||||
from db.db_helper import get_table_json, get_device_condition_by_status
|
||||
from utils.datetime_utils import format_date
|
||||
|
||||
|
||||
# --------------------------
|
||||
|
||||
@@ -14,8 +14,9 @@ INSTALL_PATH="/app"
|
||||
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
|
||||
|
||||
from database import get_temp_db_connection
|
||||
from helper import is_random_mac, format_date, get_setting_value, format_date_iso, format_event_date, mylog, ensure_datetime
|
||||
from helper import is_random_mac, get_setting_value, mylog
|
||||
from db.db_helper import row_to_json, get_date_from_period
|
||||
from utils.datetime_utils import format_date, format_date_iso, format_event_date, ensure_datetime
|
||||
|
||||
|
||||
# --------------------------
|
||||
|
||||
@@ -14,7 +14,8 @@ INSTALL_PATH="/app"
|
||||
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
|
||||
|
||||
from database import get_temp_db_connection
|
||||
from helper import is_random_mac, format_date, get_setting_value
|
||||
from helper import is_random_mac, get_setting_value
|
||||
from utils.datetime_utils import format_date
|
||||
|
||||
|
||||
# --------------------------------------------------
|
||||
|
||||
@@ -16,8 +16,9 @@ INSTALL_PATH="/app"
|
||||
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
|
||||
|
||||
from database import get_temp_db_connection
|
||||
from helper import is_random_mac, format_date, get_setting_value, format_date_iso, format_event_date, mylog, format_date_diff, format_ip_long, parse_datetime
|
||||
from helper import is_random_mac, get_setting_value, mylog, format_ip_long
|
||||
from db.db_helper import row_to_json, get_date_from_period
|
||||
from utils.datetime_utils import format_date_iso, format_event_date, format_date_diff, parse_datetime, format_date
|
||||
|
||||
|
||||
# --------------------------
|
||||
@@ -207,6 +208,7 @@ def get_device_sessions(mac, period):
|
||||
cur.execute(sql, (mac,))
|
||||
rows = cur.fetchall()
|
||||
conn.close()
|
||||
tz_name = get_setting_value("TIMEZONE") or "UTC"
|
||||
|
||||
table_data = {"data": []}
|
||||
|
||||
@@ -229,9 +231,9 @@ def get_device_sessions(mac, period):
|
||||
if row["ses_EventTypeConnection"] in ("<missing event>", None) or row["ses_EventTypeDisconnection"] in ("<missing event>", None):
|
||||
dur = "..."
|
||||
elif row["ses_StillConnected"]:
|
||||
dur = format_date_diff(row["ses_DateTimeConnection"], None)["text"]
|
||||
dur = format_date_diff(row["ses_DateTimeConnection"], None, tz_name)["text"]
|
||||
else:
|
||||
dur = format_date_diff(row["ses_DateTimeConnection"], row["ses_DateTimeDisconnection"])["text"]
|
||||
dur = format_date_diff(row["ses_DateTimeConnection"], row["ses_DateTimeDisconnection"], tz_name)["text"]
|
||||
|
||||
# Additional Info
|
||||
info = row["ses_AdditionalInfo"]
|
||||
@@ -349,11 +351,11 @@ def get_session_events(event_type, period_date):
|
||||
if event_type in ("sessions", "missing"):
|
||||
# Duration
|
||||
if row[5] and row[6]:
|
||||
delta = format_date_diff(row[5], row[6])
|
||||
delta = format_date_diff(row[5], row[6], tz_name)
|
||||
row[7] = delta["text"]
|
||||
row[8] = int(delta["total_minutes"] * 60) # seconds
|
||||
elif row[12] == 1:
|
||||
delta = format_date_diff(row[5], None)
|
||||
delta = format_date_diff(row[5], None, tz_name)
|
||||
row[7] = delta["text"]
|
||||
row[8] = int(delta["total_minutes"] * 60) # seconds
|
||||
else:
|
||||
|
||||
@@ -2,7 +2,8 @@ import os
|
||||
import base64
|
||||
from flask import jsonify, request
|
||||
from logger import mylog
|
||||
from helper import get_setting_value, timeNowDB
|
||||
from helper import get_setting_value
|
||||
from utils.datetime_utils import timeNowDB
|
||||
from messaging.in_app import write_notification
|
||||
|
||||
INSTALL_PATH = "/app"
|
||||
|
||||
@@ -4,7 +4,8 @@ import json
|
||||
import conf
|
||||
from const import *
|
||||
from logger import mylog, logResult
|
||||
from helper import timeNowDB, timeNow, checkNewVersion
|
||||
from helper import checkNewVersion
|
||||
from utils.datetime_utils import timeNowDB, timeNow
|
||||
|
||||
# Register NetAlertX directories
|
||||
INSTALL_PATH="/app"
|
||||
|
||||
149
server/helper.py
149
server/helper.py
@@ -7,7 +7,6 @@ import os
|
||||
import re
|
||||
import unicodedata
|
||||
import subprocess
|
||||
from typing import Union
|
||||
import pytz
|
||||
from pytz import timezone
|
||||
import json
|
||||
@@ -29,144 +28,6 @@ from logger import mylog, logResult
|
||||
# Register NetAlertX directories
|
||||
INSTALL_PATH="/app"
|
||||
|
||||
#-------------------------------------------------------------------------------
|
||||
# DateTime
|
||||
#-------------------------------------------------------------------------------
|
||||
def timeNowTZ():
|
||||
if conf.tz:
|
||||
return datetime.datetime.now(conf.tz).replace(microsecond=0)
|
||||
else:
|
||||
return datetime.datetime.now().replace(microsecond=0)
|
||||
|
||||
def timeNow():
|
||||
return datetime.datetime.now().replace(microsecond=0)
|
||||
|
||||
def get_timezone_offset():
|
||||
now = datetime.datetime.now(conf.tz)
|
||||
offset_hours = now.utcoffset().total_seconds() / 3600
|
||||
offset_formatted = "{:+03d}:{:02d}".format(int(offset_hours), int((offset_hours % 1) * 60))
|
||||
return offset_formatted
|
||||
|
||||
def timeNowDB(local=True):
|
||||
"""
|
||||
Return the current time (local or UTC) as ISO 8601 for DB storage.
|
||||
Safe for SQLite, PostgreSQL, etc.
|
||||
|
||||
Example local: '2025-11-04 18:09:11'
|
||||
Example UTC: '2025-11-04 07:09:11'
|
||||
"""
|
||||
if local:
|
||||
try:
|
||||
if isinstance(conf.tz, datetime.tzinfo):
|
||||
tz = conf.tz
|
||||
elif conf.tz:
|
||||
tz = ZoneInfo(conf.tz)
|
||||
else:
|
||||
tz = None
|
||||
except Exception:
|
||||
tz = None
|
||||
return datetime.datetime.now(tz).strftime('%Y-%m-%d %H:%M:%S')
|
||||
else:
|
||||
return datetime.datetime.now(datetime.UTC).strftime('%Y-%m-%d %H:%M:%S')
|
||||
|
||||
|
||||
#-------------------------------------------------------------------------------
|
||||
# Date and time methods
|
||||
#-------------------------------------------------------------------------------
|
||||
|
||||
# -------------------------------------------------------------------------------------------
|
||||
def format_date_iso(date1: str) -> str:
|
||||
"""Return ISO 8601 string for a date or None if empty"""
|
||||
if date1 is None:
|
||||
return None
|
||||
dt = datetime.datetime.fromisoformat(date1) if isinstance(date1, str) else date1
|
||||
return dt.isoformat()
|
||||
|
||||
# -------------------------------------------------------------------------------------------
|
||||
def format_event_date(date_str: str, event_type: str) -> str:
|
||||
"""Format event date with fallback rules."""
|
||||
if date_str:
|
||||
return format_date(date_str)
|
||||
elif event_type == "<missing event>":
|
||||
return "<missing event>"
|
||||
else:
|
||||
return "<still connected>"
|
||||
|
||||
# -------------------------------------------------------------------------------------------
|
||||
def ensure_datetime(dt: Union[str, datetime.datetime, None]) -> datetime.datetime:
|
||||
if dt is None:
|
||||
return timeNowTZ()
|
||||
if isinstance(dt, str):
|
||||
return datetime.datetime.fromisoformat(dt)
|
||||
return dt
|
||||
|
||||
|
||||
def parse_datetime(dt_str):
|
||||
if not dt_str:
|
||||
return None
|
||||
try:
|
||||
# Try ISO8601 first
|
||||
return datetime.datetime.fromisoformat(dt_str)
|
||||
except ValueError:
|
||||
# Try RFC1123 / HTTP format
|
||||
try:
|
||||
return datetime.datetime.strptime(dt_str, '%a, %d %b %Y %H:%M:%S GMT')
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
def format_date(date_str: str) -> str:
|
||||
try:
|
||||
dt = parse_datetime(date_str)
|
||||
if dt.tzinfo is None:
|
||||
# Set timezone if missing — change to timezone.utc if you prefer UTC
|
||||
now = datetime.datetime.now(conf.tz)
|
||||
dt = dt.replace(tzinfo=now.astimezone().tzinfo)
|
||||
return dt.astimezone().isoformat()
|
||||
except Exception:
|
||||
return "invalid"
|
||||
|
||||
def format_date_diff(date1, date2):
|
||||
"""
|
||||
Return difference between two datetimes as 'Xd HH:MM'.
|
||||
Uses app timezone if datetime is naive.
|
||||
date2 can be None (uses now).
|
||||
"""
|
||||
# Get timezone from settings
|
||||
tz_name = get_setting_value("TIMEZONE") or "UTC"
|
||||
tz = pytz.timezone(tz_name)
|
||||
|
||||
def parse_dt(dt):
|
||||
if dt is None:
|
||||
return datetime.datetime.now(tz)
|
||||
if isinstance(dt, str):
|
||||
try:
|
||||
dt_parsed = email.utils.parsedate_to_datetime(dt)
|
||||
except Exception:
|
||||
# fallback: parse ISO string
|
||||
dt_parsed = datetime.datetime.fromisoformat(dt)
|
||||
# convert naive GMT/UTC to app timezone
|
||||
if dt_parsed.tzinfo is None:
|
||||
dt_parsed = tz.localize(dt_parsed)
|
||||
else:
|
||||
dt_parsed = dt_parsed.astimezone(tz)
|
||||
return dt_parsed
|
||||
return dt if dt.tzinfo else tz.localize(dt)
|
||||
|
||||
dt1 = parse_dt(date1)
|
||||
dt2 = parse_dt(date2)
|
||||
|
||||
delta = dt2 - dt1
|
||||
total_minutes = int(delta.total_seconds() // 60)
|
||||
days, rem_minutes = divmod(total_minutes, 1440) # 1440 mins in a day
|
||||
hours, minutes = divmod(rem_minutes, 60)
|
||||
|
||||
return {
|
||||
"text": f"{days}d {hours:02}:{minutes:02}",
|
||||
"days": days,
|
||||
"hours": hours,
|
||||
"minutes": minutes,
|
||||
"total_minutes": total_minutes
|
||||
}
|
||||
|
||||
#-------------------------------------------------------------------------------
|
||||
# File system permission handling
|
||||
@@ -381,10 +242,12 @@ def get_setting_value(key):
|
||||
value = setting_value_to_python_type(set_type, set_value)
|
||||
else:
|
||||
value = setting_value_to_python_type(set_type, str(set_value))
|
||||
|
||||
SETTINGS_SECONDARYCACHE[key] = value
|
||||
|
||||
return value
|
||||
|
||||
# Otherwise fall back to retrive from json
|
||||
# Otherwise fall back to retrieve from json
|
||||
setting = get_setting(key)
|
||||
|
||||
if setting is not None:
|
||||
@@ -458,9 +321,6 @@ def setting_value_to_python_type(set_type, set_value):
|
||||
if isinstance(set_value, str):
|
||||
try:
|
||||
value = json.loads(set_value.replace("'", "\""))
|
||||
|
||||
# reverse transformations to all entries
|
||||
value = reverseTransformers(value, transformers)
|
||||
|
||||
except json.JSONDecodeError as e:
|
||||
mylog('none', [f'[setting_value_to_python_type] Error decoding JSON object: {e}'])
|
||||
@@ -470,6 +330,9 @@ def setting_value_to_python_type(set_type, set_value):
|
||||
elif isinstance(set_value, list):
|
||||
value = set_value
|
||||
|
||||
# Always apply transformers (base64, etc.) to array entries
|
||||
value = reverseTransformers(value, transformers)
|
||||
|
||||
elif dataType == 'object' and elementType == 'input':
|
||||
if isinstance(set_value, str):
|
||||
try:
|
||||
|
||||
@@ -12,7 +12,8 @@ import re
|
||||
# Register NetAlertX libraries
|
||||
import conf
|
||||
from const import fullConfPath, applicationPath, fullConfFolder, default_tz
|
||||
from helper import getBuildTimeStamp, fixPermissions, collect_lang_strings, updateSubnets, isJsonObject, setting_value_to_python_type, timeNowDB, get_setting_value, generate_random_string
|
||||
from helper import getBuildTimeStamp, fixPermissions, collect_lang_strings, updateSubnets, isJsonObject, setting_value_to_python_type, get_setting_value, generate_random_string
|
||||
from utils.datetime_utils import timeNowDB
|
||||
from app_state import updateState
|
||||
from logger import mylog
|
||||
from api import update_api
|
||||
|
||||
@@ -7,40 +7,15 @@ import time
|
||||
import logging
|
||||
from zoneinfo import ZoneInfo
|
||||
|
||||
# Register NetAlertX directories
|
||||
INSTALL_PATH="/app"
|
||||
|
||||
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
|
||||
|
||||
# NetAlertX imports
|
||||
import conf
|
||||
from const import *
|
||||
|
||||
#-------------------------------------------------------------------------------
|
||||
# duplication from helper to avoid circle
|
||||
#-------------------------------------------------------------------------------
|
||||
def timeNowTZ():
|
||||
if conf.tz:
|
||||
return datetime.datetime.now(conf.tz).replace(microsecond=0)
|
||||
else:
|
||||
return datetime.datetime.now().replace(microsecond=0)
|
||||
|
||||
def timeNowDB(local=True):
|
||||
"""
|
||||
Return the current time (local or UTC) as ISO 8601 for DB storage.
|
||||
Safe for SQLite, PostgreSQL, etc.
|
||||
|
||||
Example local: '2025-11-04 18:09:11'
|
||||
Example UTC: '2025-11-04 07:09:11'
|
||||
"""
|
||||
if local:
|
||||
try:
|
||||
if isinstance(conf.tz, datetime.tzinfo):
|
||||
tz = conf.tz
|
||||
elif conf.tz:
|
||||
tz = ZoneInfo(conf.tz)
|
||||
else:
|
||||
tz = None
|
||||
except Exception:
|
||||
tz = None
|
||||
return datetime.datetime.now(tz).strftime('%Y-%m-%d %H:%M:%S')
|
||||
else:
|
||||
return datetime.datetime.now(datetime.UTC).strftime('%Y-%m-%d %H:%M:%S')
|
||||
from utils.datetime_utils import timeNowTZ
|
||||
|
||||
#-------------------------------------------------------------------------------
|
||||
# Map custom debug levels to Python logging levels
|
||||
|
||||
@@ -20,7 +20,8 @@ sys.path.extend([f"{INSTALL_PATH}/server"])
|
||||
import conf
|
||||
from const import applicationPath, logPath, apiPath, confFileName, reportTemplatesPath
|
||||
from logger import logResult, mylog
|
||||
from helper import generate_mac_links, removeDuplicateNewLines, timeNowDB, get_file_content, write_file, get_setting_value, get_timezone_offset
|
||||
from helper import generate_mac_links, removeDuplicateNewLines, get_file_content, write_file, get_setting_value
|
||||
from utils.datetime_utils import timeNowDB
|
||||
|
||||
NOTIFICATION_API_FILE = apiPath + 'user_notifications.json'
|
||||
|
||||
|
||||
@@ -20,9 +20,10 @@ sys.path.extend([f"{INSTALL_PATH}/server"])
|
||||
|
||||
import conf
|
||||
from const import applicationPath, logPath, apiPath, confFileName
|
||||
from helper import get_file_content, write_file, get_timezone_offset, get_setting_value
|
||||
from helper import get_file_content, write_file, get_setting_value
|
||||
from logger import logResult, mylog
|
||||
from db.sql_safe_builder import create_safe_condition_builder
|
||||
from utils.datetime_utils import get_timezone_offset
|
||||
|
||||
#===============================================================================
|
||||
# REPORTING
|
||||
|
||||
@@ -16,12 +16,10 @@ from const import applicationPath, logPath, apiPath, reportTemplatesPath
|
||||
from logger import mylog, Logger
|
||||
from helper import generate_mac_links, \
|
||||
removeDuplicateNewLines, \
|
||||
timeNowDB, \
|
||||
timeNowTZ, \
|
||||
write_file, \
|
||||
get_setting_value, \
|
||||
get_timezone_offset
|
||||
get_setting_value
|
||||
from messaging.in_app import write_notification
|
||||
from utils.datetime_utils import timeNowDB, get_timezone_offset
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@@ -276,7 +274,7 @@ class NotificationInstance:
|
||||
# Clear the Pending Email flag from all events and devices
|
||||
def clearPendingEmailFlag(self):
|
||||
|
||||
# Clean Pending Alert Events
|
||||
# Clean Pending Alert Events
|
||||
self.db.sql.execute("""
|
||||
UPDATE Devices SET devLastNotification = ?
|
||||
WHERE devMac IN (
|
||||
|
||||
@@ -12,7 +12,8 @@ from collections import namedtuple
|
||||
import conf
|
||||
from const import pluginsPath, logPath, applicationPath, reportTemplatesPath
|
||||
from logger import mylog, Logger
|
||||
from helper import timeNowDB, timeNowTZ, get_file_content, write_file, get_setting, get_setting_value
|
||||
from helper import get_file_content, write_file, get_setting, get_setting_value
|
||||
from utils.datetime_utils import timeNowTZ, timeNowDB
|
||||
from app_state import updateState
|
||||
from api import update_api
|
||||
from plugin_utils import logEventStatusCounts, get_plugin_string, get_plugin_setting_obj, print_plugin_info, list_to_csv, combine_plugin_objects, resolve_wildcards_arr, handle_empty, custom_plugin_decoder, decode_and_rename_files
|
||||
|
||||
@@ -10,7 +10,8 @@ from dateutil import parser
|
||||
INSTALL_PATH="/app"
|
||||
sys.path.extend([f"{INSTALL_PATH}/server"])
|
||||
|
||||
from helper import timeNowDB, timeNowTZ, get_setting_value, check_IP_format
|
||||
from helper import get_setting_value, check_IP_format
|
||||
from utils.datetime_utils import timeNowDB
|
||||
from logger import mylog, Logger
|
||||
from const import vendorsPath, vendorsPathNewest, sql_generateGuid
|
||||
from models.device_instance import DeviceInstance
|
||||
|
||||
@@ -72,6 +72,7 @@ class NameResolver:
|
||||
name += " (IP match)"
|
||||
|
||||
regexes = get_setting_value('NEWDEV_NAME_CLEANUP_REGEX') or []
|
||||
mylog('trace', [f"[cleanDeviceName] applying regexes: {regexes}"])
|
||||
for rgx in regexes:
|
||||
mylog('trace', [f"[cleanDeviceName] applying regex: {rgx}"])
|
||||
name = re.sub(rgx, "", name)
|
||||
|
||||
@@ -6,7 +6,8 @@ sys.path.extend([f"{INSTALL_PATH}/server"])
|
||||
|
||||
import conf
|
||||
from scan.device_handling import create_new_devices, print_scan_stats, save_scanned_devices, exclude_ignored_devices, update_devices_data_from_scan
|
||||
from helper import timeNowDB, get_setting_value
|
||||
from helper import get_setting_value
|
||||
from utils.datetime_utils import timeNowDB
|
||||
from db.db_helper import print_table_schema
|
||||
from logger import mylog, Logger
|
||||
from messaging.reporting import skip_repeated_notifications
|
||||
|
||||
159
server/utils/datetime_utils.py
Normal file
159
server/utils/datetime_utils.py
Normal file
@@ -0,0 +1,159 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
import os
|
||||
import pathlib
|
||||
import sys
|
||||
from datetime import datetime
|
||||
import datetime
|
||||
import pytz
|
||||
from pytz import timezone
|
||||
from typing import Union
|
||||
from zoneinfo import ZoneInfo
|
||||
import email.utils
|
||||
|
||||
# Register NetAlertX directories
|
||||
INSTALL_PATH="/app"
|
||||
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
|
||||
|
||||
import conf
|
||||
from const import *
|
||||
|
||||
|
||||
|
||||
#-------------------------------------------------------------------------------
|
||||
# DateTime
|
||||
#-------------------------------------------------------------------------------
|
||||
def timeNowTZ():
|
||||
if conf.tz:
|
||||
return datetime.datetime.now(conf.tz).replace(microsecond=0)
|
||||
else:
|
||||
return datetime.datetime.now().replace(microsecond=0)
|
||||
|
||||
def timeNow():
|
||||
return datetime.datetime.now().replace(microsecond=0)
|
||||
|
||||
def get_timezone_offset():
|
||||
now = datetime.datetime.now(conf.tz)
|
||||
offset_hours = now.utcoffset().total_seconds() / 3600
|
||||
offset_formatted = "{:+03d}:{:02d}".format(int(offset_hours), int((offset_hours % 1) * 60))
|
||||
return offset_formatted
|
||||
|
||||
def timeNowDB(local=True):
|
||||
"""
|
||||
Return the current time (local or UTC) as ISO 8601 for DB storage.
|
||||
Safe for SQLite, PostgreSQL, etc.
|
||||
|
||||
Example local: '2025-11-04 18:09:11'
|
||||
Example UTC: '2025-11-04 07:09:11'
|
||||
"""
|
||||
if local:
|
||||
try:
|
||||
if isinstance(conf.tz, datetime.tzinfo):
|
||||
tz = conf.tz
|
||||
elif conf.tz:
|
||||
tz = ZoneInfo(conf.tz)
|
||||
else:
|
||||
tz = None
|
||||
except Exception:
|
||||
tz = None
|
||||
return datetime.datetime.now(tz).strftime('%Y-%m-%d %H:%M:%S')
|
||||
else:
|
||||
return datetime.datetime.now(datetime.UTC).strftime('%Y-%m-%d %H:%M:%S')
|
||||
|
||||
|
||||
#-------------------------------------------------------------------------------
|
||||
# Date and time methods
|
||||
#-------------------------------------------------------------------------------
|
||||
|
||||
# -------------------------------------------------------------------------------------------
|
||||
def format_date_iso(date1: str) -> str:
|
||||
"""Return ISO 8601 string for a date or None if empty"""
|
||||
if date1 is None:
|
||||
return None
|
||||
dt = datetime.datetime.fromisoformat(date1) if isinstance(date1, str) else date1
|
||||
return dt.isoformat()
|
||||
|
||||
# -------------------------------------------------------------------------------------------
|
||||
def format_event_date(date_str: str, event_type: str) -> str:
|
||||
"""Format event date with fallback rules."""
|
||||
if date_str:
|
||||
return format_date(date_str)
|
||||
elif event_type == "<missing event>":
|
||||
return "<missing event>"
|
||||
else:
|
||||
return "<still connected>"
|
||||
|
||||
# -------------------------------------------------------------------------------------------
|
||||
def ensure_datetime(dt: Union[str, datetime.datetime, None]) -> datetime.datetime:
|
||||
if dt is None:
|
||||
return timeNowTZ()
|
||||
if isinstance(dt, str):
|
||||
return datetime.datetime.fromisoformat(dt)
|
||||
return dt
|
||||
|
||||
|
||||
def parse_datetime(dt_str):
|
||||
if not dt_str:
|
||||
return None
|
||||
try:
|
||||
# Try ISO8601 first
|
||||
return datetime.datetime.fromisoformat(dt_str)
|
||||
except ValueError:
|
||||
# Try RFC1123 / HTTP format
|
||||
try:
|
||||
return datetime.datetime.strptime(dt_str, '%a, %d %b %Y %H:%M:%S GMT')
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
def format_date(date_str: str) -> str:
|
||||
try:
|
||||
dt = parse_datetime(date_str)
|
||||
if dt.tzinfo is None:
|
||||
# Set timezone if missing — change to timezone.utc if you prefer UTC
|
||||
now = datetime.datetime.now(conf.tz)
|
||||
dt = dt.replace(tzinfo=now.astimezone().tzinfo)
|
||||
return dt.astimezone().isoformat()
|
||||
except (ValueError, AttributeError, TypeError):
|
||||
return "invalid"
|
||||
|
||||
def format_date_diff(date1, date2, tz_name):
|
||||
"""
|
||||
Return difference between two datetimes as 'Xd HH:MM'.
|
||||
Uses app timezone if datetime is naive.
|
||||
date2 can be None (uses now).
|
||||
"""
|
||||
# Get timezone from settings
|
||||
tz = pytz.timezone(tz_name)
|
||||
|
||||
def parse_dt(dt):
|
||||
if dt is None:
|
||||
return datetime.datetime.now(tz)
|
||||
if isinstance(dt, str):
|
||||
try:
|
||||
dt_parsed = email.utils.parsedate_to_datetime(dt)
|
||||
except (ValueError, TypeError):
|
||||
# fallback: parse ISO string
|
||||
dt_parsed = datetime.datetime.fromisoformat(dt)
|
||||
# convert naive GMT/UTC to app timezone
|
||||
if dt_parsed.tzinfo is None:
|
||||
dt_parsed = tz.localize(dt_parsed)
|
||||
else:
|
||||
dt_parsed = dt_parsed.astimezone(tz)
|
||||
return dt_parsed
|
||||
return dt if dt.tzinfo else tz.localize(dt)
|
||||
|
||||
dt1 = parse_dt(date1)
|
||||
dt2 = parse_dt(date2)
|
||||
|
||||
delta = dt2 - dt1
|
||||
total_minutes = int(delta.total_seconds() // 60)
|
||||
days, rem_minutes = divmod(total_minutes, 1440) # 1440 mins in a day
|
||||
hours, minutes = divmod(rem_minutes, 60)
|
||||
|
||||
return {
|
||||
"text": f"{days}d {hours:02}:{minutes:02}",
|
||||
"days": days,
|
||||
"hours": hours,
|
||||
"minutes": minutes,
|
||||
"total_minutes": total_minutes
|
||||
}
|
||||
@@ -6,7 +6,8 @@ import pytest
|
||||
INSTALL_PATH = "/app"
|
||||
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
|
||||
|
||||
from helper import get_setting_value, timeNowDB
|
||||
from helper import get_setting_value
|
||||
from utils.datetime_utils import timeNowDB
|
||||
from api_server.api_server_start import app
|
||||
|
||||
|
||||
|
||||
@@ -10,7 +10,8 @@ from datetime import datetime, timedelta
|
||||
INSTALL_PATH = "/app"
|
||||
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
|
||||
|
||||
from helper import timeNowTZ, get_setting_value
|
||||
from helper import get_setting_value
|
||||
from utils.datetime_utils import timeNowTZ
|
||||
from api_server.api_server_start import app
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
|
||||
@@ -10,7 +10,8 @@ from datetime import datetime, timedelta
|
||||
INSTALL_PATH = "/app"
|
||||
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
|
||||
|
||||
from helper import timeNowDB, timeNowTZ, get_setting_value
|
||||
from helper import get_setting_value
|
||||
from utils.datetime_utils import timeNowTZ, timeNowDB
|
||||
from api_server.api_server_start import app
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
|
||||
Reference in New Issue
Block a user