BE: linting fixes 2

Signed-off-by: jokob-sk <jokob.sk@gmail.com>
This commit is contained in:
jokob-sk
2025-11-22 20:43:36 +11:00
parent 5c14b34a8b
commit ebeb7a07af
63 changed files with 124 additions and 113 deletions

View File

@@ -134,7 +134,7 @@ def callomada(myargs):
omada_output = ""
retries = 2
while omada_output == "" and retries > 1:
while omada_output == "" and retries > 0:
retries = retries - 1
try:
mf = io.StringIO()
@@ -183,44 +183,64 @@ def add_uplink(
sadevices_linksbymac,
port_byswitchmac_byclientmac,
):
# Ensure switch_mac exists in device_data_bymac
# Ensure switch exists
if switch_mac not in device_data_bymac:
mylog("none", [f"[{pluginName}] switch_mac '{switch_mac}' not found in device_data_bymac"])
return
# Ensure SWITCH_AP key exists in the dictionary
if SWITCH_AP not in device_data_bymac[switch_mac]:
mylog("none", [f"[{pluginName}] Missing key '{SWITCH_AP}' in device_data_bymac[{switch_mac}]"])
dev_switch = device_data_bymac[switch_mac]
# Ensure list is long enough to contain SWITCH_AP index
if len(dev_switch) <= SWITCH_AP:
mylog("none", [f"[{pluginName}] SWITCH_AP index {SWITCH_AP} missing in record for {switch_mac}"])
return
# Check if uplink should be added
if device_data_bymac[switch_mac][SWITCH_AP] in [None, "null"]:
device_data_bymac[switch_mac][SWITCH_AP] = uplink_mac
# Add uplink only if empty
if dev_switch[SWITCH_AP] in (None, "null"):
dev_switch[SWITCH_AP] = uplink_mac
# Ensure uplink_mac exists in device_data_bymac
# Validate uplink_mac exists
if uplink_mac not in device_data_bymac:
mylog("none", [f"[{pluginName}] uplink_mac '{uplink_mac}' not found in device_data_bymac"])
return
# Determine port to uplink
if (
device_data_bymac[switch_mac].get(TYPE) == "Switch" and device_data_bymac[uplink_mac].get(TYPE) == "Switch"
):
dev_uplink = device_data_bymac[uplink_mac]
# Get TYPE safely
switch_type = dev_switch[TYPE] if len(dev_switch) > TYPE else None
uplink_type = dev_uplink[TYPE] if len(dev_uplink) > TYPE else None
# Switch-to-switch link → use port mapping
if switch_type == "Switch" and uplink_type == "Switch":
port_to_uplink = port_byswitchmac_byclientmac.get(switch_mac, {}).get(uplink_mac)
if port_to_uplink is None:
mylog("none", [f"[{pluginName}] Missing port info for switch_mac '{switch_mac}' and uplink_mac '{uplink_mac}'"])
mylog("none", [
f"[{pluginName}] Missing port info for {switch_mac}{uplink_mac}"
])
return
else:
port_to_uplink = device_data_bymac[uplink_mac].get(PORT_SSID)
# Other device types → read PORT_SSID index
if len(dev_uplink) <= PORT_SSID:
mylog("none", [
f"[{pluginName}] PORT_SSID index missing for uplink {uplink_mac}"
])
return
port_to_uplink = dev_uplink[PORT_SSID]
# Assign port to switch_mac
device_data_bymac[switch_mac][PORT_SSID] = port_to_uplink
# Assign port to switch
if len(dev_switch) > PORT_SSID:
dev_switch[PORT_SSID] = port_to_uplink
else:
mylog("none", [
f"[{pluginName}] PORT_SSID index missing in switch {switch_mac}"
])
# Recursively add uplinks for linked devices
# Process children recursively
for link in sadevices_linksbymac.get(switch_mac, []):
if (
link in device_data_bymac and device_data_bymac[link].get(SWITCH_AP) in [None, "null"] and device_data_bymac[switch_mac].get(TYPE) == "Switch"
link in device_data_bymac and len(device_data_bymac[link]) > SWITCH_AP and device_data_bymac[link][SWITCH_AP] in (None, "null") and len(dev_switch) > TYPE
):
if dev_switch[TYPE] == "Switch":
add_uplink(
switch_mac,
link,

View File

@@ -296,7 +296,7 @@ class OmadaAPI:
OmadaHelper.verbose(f"{method} request error: {str(ex)}")
return OmadaHelper.response("error", f"{method} request failed to endpoint '{endpoint}' with error: {str(ex)}")
def authenticate(self) -> Dict[str, any]:
def authenticate(self) -> Dict[str, Any]:
"""Make an endpoint request to get access token."""
OmadaHelper.verbose("Starting authentication process")

4
ruff.toml Normal file
View File

@@ -0,0 +1,4 @@
[lint]
select = ["E", "F"] # or whatever you are using
# Add E402 so Ruff knows the noqa is legitimate
extend-select = ["E402"]

View File

@@ -30,6 +30,7 @@ def parse_timestamp(date_str):
dt = datetime.strptime(clean_date, '%Y/%m/%d %H:%M:%S')
return int(dt.timestamp())
except Exception as e:
if logger:
logger.error(f"Failed to parse timestamp: {date_str} ({e})")
return None
@@ -83,9 +84,8 @@ def get_lease_file(hostname, username, password=None, key_filename=None, port=22
# Clean up the output by removing the command echo and shell prompts
lines = output.split('\n')
# Remove first line (command echo) and any lines containing shell prompts
# cleaned_lines = [line for line in lines
# if not line.strip().startswith(command.strip()) and not line.strip().endswith('> ') and not line.strip().endswith('# ')]
cmd = command.strip()
cleaned_lines = []

View File

@@ -136,23 +136,6 @@ def ensure_views(sql) -> bool:
"""CREATE VIEW Sessions_Devices AS SELECT * FROM Sessions LEFT JOIN "Devices" ON ses_MAC = devMac;"""
)
sql.execute(""" CREATE VIEW IF NOT EXISTS LatestEventsPerMAC AS
WITH RankedEvents AS (
SELECT
e.*,
ROW_NUMBER() OVER (PARTITION BY e.eve_MAC ORDER BY e.eve_DateTime DESC) AS row_num
FROM Events AS e
)
SELECT
e.*,
d.*,
c.*
FROM RankedEvents AS e
LEFT JOIN Devices AS d ON e.eve_MAC = d.devMac
INNER JOIN CurrentScan AS c ON e.eve_MAC = c.cur_MAC
WHERE e.row_num = 1;
""")
# handling the Convert_Events_to_Sessions / Sessions screens
sql.execute("""DROP VIEW IF EXISTS Convert_Events_to_Sessions;""")
sql.execute("""CREATE VIEW Convert_Events_to_Sessions AS SELECT EVE1.eve_MAC,

View File

@@ -624,6 +624,11 @@ def extract_ip_addresses(text):
# Helper function to determine if a MAC address is random
def is_random_mac(mac):
"""Determine if a MAC address is random, respecting user-defined prefixes not to mark as random."""
# Validate input
if not mac or len(mac) < 2:
return False
# Check if second character matches "2", "6", "A", "E" (case insensitive)
is_random = mac[1].upper() in ["2", "6", "A", "E"]
@@ -631,7 +636,7 @@ def is_random_mac(mac):
if is_random:
not_random_prefixes = get_setting_value("UI_NOT_RANDOM_MAC")
for prefix in not_random_prefixes:
if mac.startswith(prefix):
if mac.upper().startswith(prefix.upper()):
is_random = False
break
return is_random

View File

@@ -10,7 +10,7 @@ import re
# Register NetAlertX libraries
import conf
from const import fullConfPath, fullConfFolder, default_tz
from helper import getBuildTimeStampAndVersion, fixPermissions, collect_lang_strings, updateSubnets, generate_random_string
from helper import getBuildTimeStampAndVersion, collect_lang_strings, updateSubnets, generate_random_string
from utils.datetime_utils import timeNowDB
from app_state import updateState
from logger import mylog
@@ -681,9 +681,10 @@ def importConfigs(pm, db, all_plugins):
write_notification(f'[Upgrade]: App upgraded from <code>{prev_version}</code> to \
<code>{new_version}</code> 🚀 Please clear the cache: \
<ol> <li>Click OK below</li> <li>Clear the browser cache (shift + \
browser refresh button)</li> <li> Clear app cache with the <i class="fa-solid fa-rotate"></i> \
(reload) button in the header</li><li>Go to Settings and click Save</li> </ol>\
<ol> <li>Click OK below</li> \
<li>Clear the browser cache (shift + browser refresh button)</li> \
<li> Clear app cache with the <i class="fa-solid fa-rotate"></i> (reload) button in the header</li>\
<li>Go to Settings and click Save</li> </ol>\
Check out new features and what has changed in the \
<a href="https://github.com/jokob-sk/NetAlertX/releases" target="_blank">📓 release notes</a>.',
'interrupt',
@@ -804,8 +805,6 @@ def renameSettings(config_file):
str(config_file) + "_temp", str(config_file)
) # Convert config_file to a string
# ensure correct ownership
fixPermissions()
else:
mylog(
"debug", "[Config] No old setting names found in the file. No changes made."

View File

@@ -199,7 +199,7 @@ def test_devices_by_status(client, api_token, test_mac):
assert "&#9733" in fav_data["title"]
def test_delete_test_devices(client, api_token, test_mac):
def test_delete_test_devices(client, api_token):
# Delete by MAC
resp = client.delete("/devices", json={"macs": ["AA:BB:CC:*"]}, headers=auth_headers(api_token))

View File

@@ -210,7 +210,6 @@ class TestDatabaseParameterSupport(unittest.TestCase):
# This should not cause SQL injection
malicious_input = "'; DROP TABLE test_table; --"
cursor.execute("SELECT * FROM test_table WHERE name = :name", {'name': malicious_input})
# results = cursor.fetchall()
# The table should still exist and be queryable
cursor.execute("SELECT COUNT(*) FROM test_table")

View File

@@ -19,7 +19,8 @@ from messaging.reporting import get_notifications # noqa: E402 [flake8 lint sup
@pytest.fixture
def test_db_path():
path = tempfile.mktemp(suffix=".db")
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as tmp:
path = tmp.name
yield path
if os.path.exists(path):
os.remove(path)