mirror of
https://github.com/jokob-sk/NetAlertX.git
synced 2025-12-07 09:36:05 -08:00
@@ -1,121 +1,134 @@
|
||||
from front.plugins.plugin_helper import is_mac
|
||||
from logger import mylog
|
||||
from models.plugin_object_instance import PluginObjectInstance
|
||||
from database import get_temp_db_connection
|
||||
|
||||
|
||||
# -------------------------------------------------------------------------------
|
||||
# Device object handling (WIP)
|
||||
# -------------------------------------------------------------------------------
|
||||
class DeviceInstance:
|
||||
def __init__(self, db):
|
||||
self.db = db
|
||||
|
||||
# Get all
|
||||
def getAll(self):
|
||||
self.db.sql.execute("""
|
||||
SELECT * FROM Devices
|
||||
""")
|
||||
return self.db.sql.fetchall()
|
||||
|
||||
# Get all with unknown names
|
||||
def getUnknown(self):
|
||||
self.db.sql.execute("""
|
||||
SELECT * FROM Devices WHERE devName in ("(unknown)", "(name not found)", "" )
|
||||
""")
|
||||
return self.db.sql.fetchall()
|
||||
|
||||
# Get specific column value based on devMac
|
||||
def getValueWithMac(self, column_name, devMac):
|
||||
query = f"SELECT {column_name} FROM Devices WHERE devMac = ?"
|
||||
self.db.sql.execute(query, (devMac,))
|
||||
result = self.db.sql.fetchone()
|
||||
return result[column_name] if result else None
|
||||
|
||||
# Get all down
|
||||
def getDown(self):
|
||||
self.db.sql.execute("""
|
||||
SELECT * FROM Devices WHERE devAlertDown = 1 and devPresentLastScan = 0
|
||||
""")
|
||||
return self.db.sql.fetchall()
|
||||
|
||||
# Get all down
|
||||
def getOffline(self):
|
||||
self.db.sql.execute("""
|
||||
SELECT * FROM Devices WHERE devPresentLastScan = 0
|
||||
""")
|
||||
return self.db.sql.fetchall()
|
||||
|
||||
# Get a device by devGUID
|
||||
def getByGUID(self, devGUID):
|
||||
self.db.sql.execute("SELECT * FROM Devices WHERE devGUID = ?", (devGUID,))
|
||||
result = self.db.sql.fetchone()
|
||||
return dict(result) if result else None
|
||||
|
||||
# Check if a device exists by devGUID
|
||||
def exists(self, devGUID):
|
||||
self.db.sql.execute(
|
||||
"SELECT COUNT(*) AS count FROM Devices WHERE devGUID = ?", (devGUID,)
|
||||
)
|
||||
result = self.db.sql.fetchone()
|
||||
return result["count"] > 0
|
||||
|
||||
# Get a device by its last IP address
|
||||
def getByIP(self, ip):
|
||||
self.db.sql.execute("SELECT * FROM Devices WHERE devLastIP = ?", (ip,))
|
||||
row = self.db.sql.fetchone()
|
||||
return dict(row) if row else None
|
||||
|
||||
# Search devices by partial mac, name or IP
|
||||
def search(self, query):
|
||||
like = f"%{query}%"
|
||||
self.db.sql.execute(
|
||||
"SELECT * FROM Devices WHERE devMac LIKE ? OR devName LIKE ? OR devLastIP LIKE ?",
|
||||
(like, like, like),
|
||||
)
|
||||
rows = self.db.sql.fetchall()
|
||||
# --- helpers --------------------------------------------------------------
|
||||
def _fetchall(self, query, params=()):
|
||||
conn = get_temp_db_connection()
|
||||
rows = conn.execute(query, params).fetchall()
|
||||
conn.close()
|
||||
return [dict(r) for r in rows]
|
||||
|
||||
# Get the most recently discovered device
|
||||
def getLatest(self):
|
||||
self.db.sql.execute("SELECT * FROM Devices ORDER BY devFirstConnection DESC LIMIT 1")
|
||||
row = self.db.sql.fetchone()
|
||||
def _fetchone(self, query, params=()):
|
||||
conn = get_temp_db_connection()
|
||||
row = conn.execute(query, params).fetchone()
|
||||
conn.close()
|
||||
return dict(row) if row else None
|
||||
|
||||
def getNetworkTopology(self):
|
||||
"""Returns nodes and links for the current Devices table.
|
||||
def _execute(self, query, params=()):
|
||||
conn = get_temp_db_connection()
|
||||
cur = conn.cursor()
|
||||
cur.execute(query, params)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
Nodes: {id, name, vendor}
|
||||
Links: {source, target, port}
|
||||
"""
|
||||
self.db.sql.execute("SELECT devName, devMac, devParentMAC, devParentPort, devVendor FROM Devices")
|
||||
rows = self.db.sql.fetchall()
|
||||
nodes = []
|
||||
links = []
|
||||
for row in rows:
|
||||
nodes.append({"id": row['devMac'], "name": row['devName'], "vendor": row['devVendor']})
|
||||
if row['devParentMAC']:
|
||||
links.append({"source": row['devParentMAC'], "target": row['devMac'], "port": row['devParentPort']})
|
||||
# --- public API -----------------------------------------------------------
|
||||
def getAll(self):
|
||||
return self._fetchall("SELECT * FROM Devices")
|
||||
|
||||
def getUnknown(self):
|
||||
return self._fetchall("""
|
||||
SELECT * FROM Devices
|
||||
WHERE devName IN ("(unknown)", "(name not found)", "")
|
||||
""")
|
||||
|
||||
def getValueWithMac(self, column_name, devMac):
|
||||
row = self._fetchone(f"""
|
||||
SELECT {column_name} FROM Devices WHERE devMac = ?
|
||||
""", (devMac,))
|
||||
return row.get(column_name) if row else None
|
||||
|
||||
def getDown(self):
|
||||
return self._fetchall("""
|
||||
SELECT * FROM Devices
|
||||
WHERE devAlertDown = 1 AND devPresentLastScan = 0
|
||||
""")
|
||||
|
||||
def getOffline(self):
|
||||
return self._fetchall("""
|
||||
SELECT * FROM Devices
|
||||
WHERE devPresentLastScan = 0
|
||||
""")
|
||||
|
||||
def getByGUID(self, devGUID):
|
||||
return self._fetchone("""
|
||||
SELECT * FROM Devices WHERE devGUID = ?
|
||||
""", (devGUID,))
|
||||
|
||||
def exists(self, devGUID):
|
||||
row = self._fetchone("""
|
||||
SELECT COUNT(*) as count FROM Devices WHERE devGUID = ?
|
||||
""", (devGUID,))
|
||||
return row['count'] > 0 if row else False
|
||||
|
||||
def getByIP(self, ip):
|
||||
return self._fetchone("""
|
||||
SELECT * FROM Devices WHERE devLastIP = ?
|
||||
""", (ip,))
|
||||
|
||||
def search(self, query):
|
||||
like = f"%{query}%"
|
||||
return self._fetchall("""
|
||||
SELECT * FROM Devices
|
||||
WHERE devMac LIKE ? OR devName LIKE ? OR devLastIP LIKE ?
|
||||
""", (like, like, like))
|
||||
|
||||
def getLatest(self):
|
||||
return self._fetchone("""
|
||||
SELECT * FROM Devices
|
||||
ORDER BY devFirstConnection DESC LIMIT 1
|
||||
""")
|
||||
|
||||
def getNetworkTopology(self):
|
||||
rows = self._fetchall("""
|
||||
SELECT devName, devMac, devParentMAC, devParentPort, devVendor FROM Devices
|
||||
""")
|
||||
nodes = [{"id": r["devMac"], "name": r["devName"], "vendor": r["devVendor"]} for r in rows]
|
||||
links = [{"source": r["devParentMAC"], "target": r["devMac"], "port": r["devParentPort"]}
|
||||
for r in rows if r["devParentMAC"]]
|
||||
return {"nodes": nodes, "links": links}
|
||||
|
||||
# Update a specific field for a device
|
||||
def updateField(self, devGUID, field, value):
|
||||
if not self.exists(devGUID):
|
||||
m = f"[Device] In 'updateField': GUID {devGUID} not found."
|
||||
mylog("none", m)
|
||||
raise ValueError(m)
|
||||
msg = f"[Device] updateField: GUID {devGUID} not found"
|
||||
mylog("none", msg)
|
||||
raise ValueError(msg)
|
||||
self._execute(f"UPDATE Devices SET {field}=? WHERE devGUID=?", (value, devGUID))
|
||||
|
||||
self.db.sql.execute(
|
||||
f"""
|
||||
UPDATE Devices SET {field} = ? WHERE devGUID = ?
|
||||
""",
|
||||
(value, devGUID),
|
||||
)
|
||||
self.db.commitDB()
|
||||
|
||||
# Delete a device by devGUID
|
||||
def delete(self, devGUID):
|
||||
if not self.exists(devGUID):
|
||||
m = f"[Device] In 'delete': GUID {devGUID} not found."
|
||||
mylog("none", m)
|
||||
raise ValueError(m)
|
||||
msg = f"[Device] delete: GUID {devGUID} not found"
|
||||
mylog("none", msg)
|
||||
raise ValueError(msg)
|
||||
self._execute("DELETE FROM Devices WHERE devGUID=?", (devGUID,))
|
||||
|
||||
self.db.sql.execute("DELETE FROM Devices WHERE devGUID = ?", (devGUID,))
|
||||
self.db.commitDB()
|
||||
def resolvePrimaryID(self, target):
|
||||
if is_mac(target):
|
||||
return target.lower()
|
||||
dev = self.getByIP(target)
|
||||
return dev['devMac'].lower() if dev else None
|
||||
|
||||
def getOpenPorts(self, target):
|
||||
primary = self.resolvePrimaryID(target)
|
||||
if not primary:
|
||||
return []
|
||||
|
||||
objs = PluginObjectInstance().getByField(
|
||||
plugPrefix='NMAP',
|
||||
matchedColumn='Object_PrimaryID',
|
||||
matchedKey=primary,
|
||||
returnFields=['Object_SecondaryID', 'Watched_Value2']
|
||||
)
|
||||
|
||||
ports = []
|
||||
for o in objs:
|
||||
|
||||
port = int(o.get('Object_SecondaryID') or 0)
|
||||
|
||||
ports.append({"port": port, "service": o.get('Watched_Value2', '')})
|
||||
|
||||
return ports
|
||||
|
||||
Reference in New Issue
Block a user