api layer v0.2.2 - CSV import/export, refactor
Some checks failed
Code checks / check-url-paths (push) Has been cancelled
docker / docker_dev (push) Has been cancelled
Deploy MkDocs / deploy (push) Has been cancelled

This commit is contained in:
jokob-sk
2025-08-19 07:56:54 +10:00
parent 9c71a8ecab
commit 962bbaa5a1
14 changed files with 738 additions and 284 deletions

View File

@@ -18,7 +18,6 @@ import hashlib
import random
import string
import ipaddress
import dns.resolver
import conf
from const import *
@@ -53,22 +52,6 @@ def get_timezone_offset():
return offset_formatted
#-------------------------------------------------------------------------------
def updateSubnets(scan_subnets):
subnets = []
# multiple interfaces
if type(scan_subnets) is list:
for interface in scan_subnets :
subnets.append(interface)
# one interface only
else:
subnets.append(scan_subnets)
return subnets
#-------------------------------------------------------------------------------
# File system permission handling
#-------------------------------------------------------------------------------
@@ -217,12 +200,6 @@ def get_setting(key):
return None
#-------------------------------------------------------------------------------
# Settings
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
# Return setting value
def get_setting_value(key):
@@ -248,8 +225,6 @@ def get_setting_value(key):
#-------------------------------------------------------------------------------
# Convert the setting value to the corresponding python type
def setting_value_to_python_type(set_type, set_value):
value = '----not processed----'
@@ -341,6 +316,30 @@ def setting_value_to_python_type(set_type, set_value):
return value
#-------------------------------------------------------------------------------
def updateSubnets(scan_subnets):
"""
Normalize scan subnet input into a list of subnets.
Parameters:
scan_subnets (str or list): A single subnet string or a list of subnet strings.
Returns:
list: A list containing all subnets. If a single subnet is provided, it is returned as a single-element list.
"""
subnets = []
# multiple interfaces
if isinstance(scan_subnets, list):
for interface in scan_subnets:
subnets.append(interface)
# one interface only
else:
subnets.append(scan_subnets)
return subnets
#-------------------------------------------------------------------------------
# Reverse transformed values if needed
def reverseTransformers(val, transformers):
@@ -360,41 +359,6 @@ def reverseTransformers(val, transformers):
else:
return reverse_transformers(val, transformers)
#-------------------------------------------------------------------------------
# Generate a WHERE condition for SQLite based on a list of values.
def list_to_where(logical_operator, column_name, condition_operator, values_list):
"""
Generate a WHERE condition for SQLite based on a list of values.
Parameters:
- logical_operator: The logical operator ('AND' or 'OR') to combine conditions.
- column_name: The name of the column to filter on.
- condition_operator: The condition operator ('LIKE', 'NOT LIKE', '=', '!=', etc.).
- values_list: A list of values to be included in the condition.
Returns:
- A string representing the WHERE condition.
"""
# If the list is empty, return an empty string
if not values_list:
return ""
# Replace {s-quote} with single quote in values_list
values_list = [value.replace("{s-quote}", "'") for value in values_list]
# Build the WHERE condition for the first value
condition = f"{column_name} {condition_operator} '{values_list[0]}'"
# Add the rest of the values using the logical operator
for value in values_list[1:]:
condition += f" {logical_operator} {column_name} {condition_operator} '{value}'"
return f'({condition})'
#-------------------------------------------------------------------------------
# IP validation methods
@@ -432,6 +396,19 @@ def check_IP_format (pIP):
# String manipulation methods
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
def generate_random_string(length):
characters = string.ascii_letters + string.digits
return ''.join(random.choice(characters) for _ in range(length))
#-------------------------------------------------------------------------------
def extract_between_strings(text, start, end):
start_index = text.find(start)
end_index = text.find(end, start_index + len(start))
if start_index != -1 and end_index != -1:
return text[start_index + len(start):end_index]
else:
return ""
#-------------------------------------------------------------------------------
@@ -474,7 +451,6 @@ def removeDuplicateNewLines(text):
return text
#-------------------------------------------------------------------------------
def sanitize_string(input):
if isinstance(input, bytes):
input = input.decode('utf-8')
@@ -482,15 +458,6 @@ def sanitize_string(input):
return input
#-------------------------------------------------------------------------------
def sanitize_SQL_input(val):
if val is None:
return ''
if isinstance(val, str):
return val.replace("'", "_")
return val # Return non-string values as they are
#-------------------------------------------------------------------------------
# Function to normalize the string and remove diacritics
def normalize_string(text):
@@ -501,8 +468,29 @@ def normalize_string(text):
# Filter out diacritics and unwanted characters
return ''.join(c for c in normalized_text if unicodedata.category(c) != 'Mn')
# ------------------------------------------------------------------------------
# MAC and IP helper methods
#-------------------------------------------------------------------------------
# -------------------------------------------------------------------------------------------
def is_random_mac(mac: str) -> bool:
"""Determine if a MAC address is random, respecting user-defined prefixes not to mark as random."""
is_random = mac[1].upper() in ["2", "6", "A", "E"]
# Get prefixes from settings
prefixes = get_setting_value("UI_NOT_RANDOM_MAC")
# If detected as random, make sure it doesn't start with a prefix the user wants to exclude
if is_random:
for prefix in prefixes:
if mac.upper().startswith(prefix.upper()):
is_random = False
break
return is_random
# -------------------------------------------------------------------------------------------
def generate_mac_links (html, deviceUrl):
p = re.compile(r'(?:[0-9a-fA-F]:?){12}')
@@ -514,15 +502,6 @@ def generate_mac_links (html, deviceUrl):
return html
#-------------------------------------------------------------------------------
def extract_between_strings(text, start, end):
start_index = text.find(start)
end_index = text.find(end, start_index + len(start))
if start_index != -1 and end_index != -1:
return text[start_index + len(start):end_index]
else:
return ""
#-------------------------------------------------------------------------------
def extract_mac_addresses(text):
mac_pattern = r"([0-9A-Fa-f]{2}[:-][0-9A-Fa-f]{2}[:-][0-9A-Fa-f]{2}[:-][0-9A-Fa-f]{2}[:-][0-9A-Fa-f]{2}[:-][0-9A-Fa-f]{2})"
@@ -536,11 +515,6 @@ def extract_ip_addresses(text):
return ip_addresses
#-------------------------------------------------------------------------------
def generate_random_string(length):
characters = string.ascii_letters + string.digits
return ''.join(random.choice(characters) for _ in range(length))
# Helper function to determine if a MAC address is random
def is_random_mac(mac):
# Check if second character matches "2", "6", "A", "E" (case insensitive)
@@ -555,13 +529,14 @@ def is_random_mac(mac):
break
return is_random
#-------------------------------------------------------------------------------
# Helper function to calculate number of children
def get_number_of_children(mac, devices):
# Count children by checking devParentMAC for each device
return sum(1 for dev in devices if dev.get("devParentMAC", "").strip() == mac.strip())
#-------------------------------------------------------------------------------
# Function to convert IP to a long integer
def format_ip_long(ip_address):
try:
@@ -596,8 +571,6 @@ def add_json_list (row, list):
return list
#-------------------------------------------------------------------------------
# Checks if the object has a __dict__ attribute. If it does, it assumes that it's an instance of a class and serializes its attributes dynamically.
class NotiStrucEncoder(json.JSONEncoder):
@@ -607,19 +580,6 @@ class NotiStrucEncoder(json.JSONEncoder):
return obj.__dict__
return super().default(obj)
#-------------------------------------------------------------------------------
# Creates a JSON object from a DB row
def row_to_json(names, row):
rowEntry = {}
index = 0
for name in names:
rowEntry[name]= if_byte_then_to_str(row[name])
index += 1
return rowEntry
#-------------------------------------------------------------------------------
# Get language strings from plugin JSON
def collect_lang_strings(json, pref, stringSqlParams):
@@ -633,7 +593,7 @@ def collect_lang_strings(json, pref, stringSqlParams):
return stringSqlParams
#-------------------------------------------------------------------------------
# Misc
# Date and time methods
#-------------------------------------------------------------------------------
# -------------------------------------------------------------------------------------------
@@ -661,65 +621,6 @@ def format_date_iso(date1: str) -> str:
dt = datetime.datetime.fromisoformat(date1) if isinstance(date1, str) else date1
return dt.isoformat()
# -------------------------------------------------------------------------------------------
def is_random_mac(mac: str) -> bool:
"""Determine if a MAC address is random, respecting user-defined prefixes not to mark as random."""
is_random = mac[1].upper() in ["2", "6", "A", "E"]
# Get prefixes from settings
prefixes = get_setting_value("UI_NOT_RANDOM_MAC")
# If detected as random, make sure it doesn't start with a prefix the user wants to exclude
if is_random:
for prefix in prefixes:
if mac.upper().startswith(prefix.upper()):
is_random = False
break
return is_random
# -------------------------------------------------------------------------------------------
def get_date_from_period(period):
"""
Convert a period request parameter into an SQLite date expression.
Equivalent to PHP getDateFromPeriod().
Returns a string like "date('now', '-7 day')"
"""
days_map = {
'7 days': 7,
'1 month': 30,
'1 year': 365,
'100 years': 3650, # actually 10 years in original PHP
}
days = days_map.get(period, 1) # default 1 day
period_sql = f"date('now', '-{days} day')"
return period_sql
#-------------------------------------------------------------------------------
def print_table_schema(db, table):
sql = db.sql
sql.execute(f"PRAGMA table_info({table})")
result = sql.fetchall()
if not result:
mylog('none', f'[Schema] Table "{table}" not found or has no columns.')
return
mylog('debug', f'[Schema] Structure for table: {table}')
header = f"{'cid':<4} {'name':<20} {'type':<10} {'notnull':<8} {'default':<10} {'pk':<2}"
mylog('debug', header)
mylog('debug', '-' * len(header))
for row in result:
# row = (cid, name, type, notnull, dflt_value, pk)
line = f"{row[0]:<4} {row[1]:<20} {row[2]:<10} {row[3]:<8} {str(row[4]):<10} {row[5]:<2}"
mylog('debug', line)
#-------------------------------------------------------------------------------
def checkNewVersion():
mylog('debug', [f"[Version check] Checking if new version available"])
@@ -761,22 +662,6 @@ def checkNewVersion():
return newVersion
#-------------------------------------------------------------------------------
def initOrSetParam(db, parID, parValue):
sql = db.sql
sql.execute ("INSERT INTO Parameters(par_ID, par_Value) VALUES('"+str(parID)+"', '"+str(parValue)+"') ON CONFLICT(par_ID) DO UPDATE SET par_Value='"+str(parValue)+"' where par_ID='"+str(parID)+"'")
db.commitDB()
#-------------------------------------------------------------------------------
class json_obj:
def __init__(self, jsn, columnNames):
self.json = jsn
self.columnNames = columnNames
#-------------------------------------------------------------------------------
class noti_obj:
def __init__(self, json, text, html):