BE: linting fixes 3

Signed-off-by: jokob-sk <jokob.sk@gmail.com>
This commit is contained in:
jokob-sk
2025-11-22 21:06:03 +11:00
parent ebeb7a07af
commit 872ac1ce0f
12 changed files with 16 additions and 21 deletions

View File

@@ -233,7 +233,6 @@ class sensor_config:
Store the sensor configuration in the global plugin_objects, which tracks sensors based on a unique combination Store the sensor configuration in the global plugin_objects, which tracks sensors based on a unique combination
of attributes including deviceId, sensorName, hash, and MAC. of attributes including deviceId, sensorName, hash, and MAC.
""" """
global plugin_objects
# Add the sensor to the global plugin_objects # Add the sensor to the global plugin_objects
plugin_objects.add_object( plugin_objects.add_object(
@@ -318,7 +317,6 @@ def create_generic_device(mqtt_client, deviceId, deviceName):
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
# Register sensor config on the broker # Register sensor config on the broker
def create_sensor(mqtt_client, deviceId, deviceName, sensorType, sensorName, icon, mac=""): def create_sensor(mqtt_client, deviceId, deviceName, sensorType, sensorName, icon, mac=""):
global mqtt_sensors
# check previous configs # check previous configs
sensorConfig = sensor_config(deviceId, deviceName, sensorType, sensorName, icon, mac) sensorConfig = sensor_config(deviceId, deviceName, sensorType, sensorName, icon, mac)
@@ -429,7 +427,7 @@ def mqtt_create_client():
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def mqtt_start(db): def mqtt_start(db):
global mqtt_client, mqtt_connected_to_broker global mqtt_client
if not mqtt_connected_to_broker: if not mqtt_connected_to_broker:
mqtt_client = mqtt_create_client() mqtt_client = mqtt_create_client()

View File

@@ -1,4 +1,3 @@
#!/usr/bin/env python #!/usr/bin/env python
import json import json

View File

@@ -1,6 +1,4 @@
#!/usr/bin/env python #!/usr/bin/env python
import json import json
import os import os
import sys import sys
@@ -99,7 +97,7 @@ def send(text):
"ut" : 'Open NetAlertX', "ut" : 'Open NetAlertX',
"k" : token, "k" : token,
} }
response = requests.post(url, data=post_fields) response = requests.post(url, data=post_fields, timeout=get_setting_value("PUSHSAFER_RUN_TIMEOUT"))
response_status_code = response.status_code response_status_code = response.status_code
# Check if the request was successful (status code 200) # Check if the request was successful (status code 200)

View File

@@ -22,6 +22,7 @@ from logger import mylog, Logger # noqa: E402 [flake8 lint suppression]
from const import logPath # noqa: E402 [flake8 lint suppression] from const import logPath # noqa: E402 [flake8 lint suppression]
from helper import get_setting_value # noqa: E402 [flake8 lint suppression] from helper import get_setting_value # noqa: E402 [flake8 lint suppression]
import conf # noqa: E402 [flake8 lint suppression] import conf # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowDB # noqa: E402 [flake8 lint suppression]
# Make sure the TIMEZONE for logging is correct # Make sure the TIMEZONE for logging is correct
conf.tz = timezone(get_setting_value("TIMEZONE")) conf.tz = timezone(get_setting_value("TIMEZONE"))
@@ -150,7 +151,7 @@ def main():
watched1=freebox["name"], watched1=freebox["name"],
watched2=freebox["operator"], watched2=freebox["operator"],
watched3="Gateway", watched3="Gateway",
watched4=datetime.now, watched4=timeNowDB(),
extra="", extra="",
foreignKey=freebox["mac"], foreignKey=freebox["mac"],
) )

View File

@@ -268,7 +268,7 @@ def main():
if is_mac(entry['mac']): if is_mac(entry['mac']):
# Map to Plugin_Objects fields # Map to Plugin_Objects fields
mylog('verbose', [f'[{pluginName}] found: {entry['name']}|{entry['mac']}|{entry['ip']}']) mylog('verbose', [f"[{pluginName}] found: {entry['name']}|{entry['mac']}|{entry['ip']}"])
plugin_objects.add_object( plugin_objects.add_object(
primaryId=str(entry['mac']), primaryId=str(entry['mac']),
@@ -281,7 +281,7 @@ def main():
foreignKey=str(entry['mac']) foreignKey=str(entry['mac'])
) )
else: else:
mylog('verbose', [f'[{pluginName}] Skipping invalid MAC: {entry['name']}|{entry['mac']}|{entry['ip']}']) mylog('verbose', [f"[{pluginName}] Skipping invalid MAC: {entry['name']}|{entry['mac']}|{entry['ip']}"])
# Write result file for NetAlertX to ingest # Write result file for NetAlertX to ingest
plugin_objects.write_result_file() plugin_objects.write_result_file()

View File

@@ -111,7 +111,6 @@ def update_api(
# ------------------------------------------------------------------------------- # -------------------------------------------------------------------------------
class api_endpoint_class: class api_endpoint_class:
def __init__(self, db, forceUpdate, query, path, is_ad_hoc_user_event=False): def __init__(self, db, forceUpdate, query, path, is_ad_hoc_user_event=False):
global apiEndpoints
current_time = timeNowTZ() current_time = timeNowTZ()
@@ -222,7 +221,7 @@ periodic_write_thread = None
def periodic_write(interval=1): def periodic_write(interval=1):
"""Periodically checks all endpoints for pending writes.""" """Periodically checks all endpoints for pending writes."""
global apiEndpoints
while not stop_event.is_set(): while not stop_event.is_set():
with api_lock: with api_lock:
for endpoint in apiEndpoints: for endpoint in apiEndpoints:

View File

@@ -96,7 +96,7 @@ def delete_unknown_devices():
def export_devices(export_format): def export_devices(export_format):
""" """
Export devices from the Devices table in teh desired format. Export devices from the Devices table in the desired format.
- If `macs` is None → delete ALL devices. - If `macs` is None → delete ALL devices.
- If `macs` is a list → delete only matching MACs (supports wildcard '*'). - If `macs` is a list → delete only matching MACs (supports wildcard '*').
""" """

View File

@@ -364,7 +364,6 @@ class Query(ObjectType):
Collect language strings, optionally filtered by language code and/or string key. Collect language strings, optionally filtered by language code and/or string key.
Caches in memory for performance. Can fallback to 'en_us' if a string is missing. Caches in memory for performance. Can fallback to 'en_us' if a string is missing.
""" """
global _langstrings_cache, _langstrings_cache_mtime
langStrings = [] langStrings = []

View File

@@ -30,7 +30,7 @@ def ensure_column(sql, table: str, column_name: str, column_type: str) -> bool:
if column_name in actual_columns: if column_name in actual_columns:
return True # Already exists return True # Already exists
# Define the expected columns (hardcoded base schema) [v25.5.24] - available in teh default app.db # Define the expected columns (hardcoded base schema) [v25.5.24] - available in the default app.db
expected_columns = [ expected_columns = [
"devMac", "devMac",
"devName", "devName",

View File

@@ -248,8 +248,6 @@ def get_setting_value(key):
Any: The Python-typed setting value, or an empty string if not found. Any: The Python-typed setting value, or an empty string if not found.
""" """
global SETTINGS_SECONDARYCACHE
# Returns empty string if not found # Returns empty string if not found
value = "" value = ""

View File

@@ -94,7 +94,10 @@ class plugin_manager:
# 🔹 CMD also retrieved from cache # 🔹 CMD also retrieved from cache
cmd_setting = self._cache["settings"].get(prefix, {}).get("CMD") cmd_setting = self._cache["settings"].get(prefix, {}).get("CMD")
mylog("debug", f"[Plugins] CMD: {cmd_setting["value"] if cmd_setting else None}")
print_str = cmd_setting["value"] if cmd_setting else None
mylog("debug", f"[Plugins] CMD: {print_str}")
execute_plugin(self.db, self.all_plugins, plugin) execute_plugin(self.db, self.all_plugins, plugin)

View File

@@ -5,7 +5,7 @@ from dateutil import parser
import datetime import datetime
import re import re
import pytz import pytz
from typing import Union from typing import Union, Optional
from zoneinfo import ZoneInfo from zoneinfo import ZoneInfo
import email.utils import email.utils
import conf import conf
@@ -112,9 +112,9 @@ def normalizeTimeStamp(inputTimeStamp):
# ------------------------------------------------------------------------------------------- # -------------------------------------------------------------------------------------------
def format_date_iso(date1: str) -> str: def format_date_iso(date1: str) -> Optional[str]:
"""Return ISO 8601 string for a date or None if empty""" """Return ISO 8601 string for a date or None if empty"""
if date1 is None: if not date1:
return None return None
dt = datetime.datetime.fromisoformat(date1) if isinstance(date1, str) else date1 dt = datetime.datetime.fromisoformat(date1) if isinstance(date1, str) else date1
return dt.isoformat() return dt.isoformat()