mirror of
https://github.com/jokob-sk/NetAlertX.git
synced 2025-12-07 09:36:05 -08:00
🔌UNIFI work
This commit is contained in:
@@ -389,6 +389,38 @@
|
||||
"show": true,
|
||||
"type": "label"
|
||||
},
|
||||
{
|
||||
"column": "HelpVal1",
|
||||
"mapped_to_column": "cur_NetworkNodeMAC",
|
||||
"css_classes": "col-sm-2",
|
||||
"default_value": "",
|
||||
"localized": ["name"],
|
||||
"name": [
|
||||
{
|
||||
"language_code": "en_us",
|
||||
"string": "Parent Network MAC"
|
||||
}
|
||||
],
|
||||
"options": [],
|
||||
"show": true,
|
||||
"type": "label"
|
||||
},
|
||||
{
|
||||
"column": "HelpVal2",
|
||||
"mapped_to_column": "cur_PORT",
|
||||
"css_classes": "col-sm-2",
|
||||
"default_value": "",
|
||||
"localized": ["name"],
|
||||
"name": [
|
||||
{
|
||||
"language_code": "en_us",
|
||||
"string": "Port"
|
||||
}
|
||||
],
|
||||
"options": [],
|
||||
"show": true,
|
||||
"type": "label"
|
||||
},
|
||||
{
|
||||
"column": "Status",
|
||||
"css_classes": "col-sm-1",
|
||||
@@ -493,7 +525,7 @@
|
||||
}
|
||||
},
|
||||
{
|
||||
"default_value": "python3 /app/front/plugins/unifi_import/script.py username={username} password={password} host={host} sites={sites} port={port} verifyssl={verifyssl} version={version} fullimport={fullimport}",
|
||||
"default_value": "python3 /app/front/plugins/unifi_import/script.py",
|
||||
"description": [
|
||||
{
|
||||
"language_code": "en_us",
|
||||
|
||||
@@ -42,52 +42,30 @@ pluginName = 'UNFIMP'
|
||||
|
||||
def main():
|
||||
|
||||
mylog('verbose', ['[UNFIMP] In script'])
|
||||
mylog('verbose', [f'[{pluginName}] In script'])
|
||||
|
||||
|
||||
# init global variables
|
||||
global UNIFI_USERNAME, UNIFI_PASSWORD, UNIFI_HOST, UNIFI_SITES, PORT, VERIFYSSL, VERSION, FULL_IMPORT
|
||||
|
||||
|
||||
parser = argparse.ArgumentParser(description='Import devices from a UNIFI controller')
|
||||
|
||||
parser.add_argument('username', action="store", help="Username used to login into the UNIFI controller")
|
||||
parser.add_argument('password', action="store", help="Password used to login into the UNIFI controller")
|
||||
parser.add_argument('host', action="store", help="Host url or IP address where the UNIFI controller is hosted (excluding http://)")
|
||||
parser.add_argument('sites', action="store", help="Name of the sites (usually 'default', check the URL in your UniFi controller UI). Separated by comma (,) if passing multiple sites")
|
||||
parser.add_argument('port', action="store", help="Usually 8443")
|
||||
parser.add_argument('verifyssl', action="store", help="verify SSL certificate [true|false]")
|
||||
parser.add_argument('version', action="store", help="The base version of the controller API [v4|v5|unifiOS|UDMP-unifiOS]")
|
||||
parser.add_argument('fullimport', action="store", help="Defines if a full import or only online devices hould be imported [disabled|once|always]")
|
||||
|
||||
values = parser.parse_args()
|
||||
|
||||
|
||||
|
||||
|
||||
# parse output
|
||||
plugin_objects = Plugin_Objects(RESULT_FILE)
|
||||
|
||||
plugin_objects = Plugin_Objects(RESULT_FILE)
|
||||
|
||||
mylog('verbose', [f'[UNFIMP] Check if all login information is available: {values}'])
|
||||
UNIFI_USERNAME = get_setting_value("UNFIMP_username")
|
||||
UNIFI_PASSWORD = get_setting_value("UNFIMP_password")
|
||||
UNIFI_HOST = get_setting_value("UNFIMP_host")
|
||||
UNIFI_SITES = get_setting_value("UNFIMP_sites")
|
||||
PORT = get_setting_value("UNFIMP_port")
|
||||
VERIFYSSL = get_setting_value("UNFIMP_verifyssl")
|
||||
VERSION = get_setting_value("UNFIMP_version")
|
||||
FULL_IMPORT = get_setting_value("UNFIMP_fullimport")
|
||||
|
||||
if values.username and values.password and values.host and values.sites:
|
||||
|
||||
UNIFI_USERNAME = values.username.split('=')[1]
|
||||
UNIFI_PASSWORD = values.password.split('=')[1]
|
||||
UNIFI_HOST = values.host.split('=')[1]
|
||||
UNIFI_SITES = values.sites.split('=')[1]
|
||||
PORT = values.port.split('=')[1]
|
||||
VERIFYSSL = values.verifyssl.split('=')[1]
|
||||
VERSION = values.version.split('=')[1]
|
||||
FULL_IMPORT = values.fullimport.split('=')[1]
|
||||
|
||||
plugin_objects = get_entries(plugin_objects)
|
||||
plugin_objects = get_entries(plugin_objects)
|
||||
|
||||
plugin_objects.write_result_file()
|
||||
|
||||
|
||||
mylog('verbose', [f'[UNFIMP] Scan finished, found {len(plugin_objects)} devices'])
|
||||
mylog('verbose', [f'[{pluginName}] Scan finished, found {len(plugin_objects)} devices'])
|
||||
|
||||
# .............................................
|
||||
|
||||
@@ -98,152 +76,166 @@ def get_entries(plugin_objects: Plugin_Objects) -> Plugin_Objects:
|
||||
lock_file_value = read_lock_file()
|
||||
perform_full_run = check_full_run_state(FULL_IMPORT, lock_file_value)
|
||||
|
||||
mylog('verbose', [f'[{pluginName}] sites: {UNIFI_SITES}'])
|
||||
|
||||
sites = []
|
||||
|
||||
if ',' in UNIFI_SITES:
|
||||
sites = UNIFI_SITES.split(',')
|
||||
|
||||
else:
|
||||
sites.append(UNIFI_SITES)
|
||||
|
||||
if (VERIFYSSL.upper() == "TRUE"):
|
||||
VERIFYSSL = True
|
||||
else:
|
||||
VERIFYSSL = False
|
||||
|
||||
for site in sites:
|
||||
|
||||
# mylog('verbose', [f'[{pluginName}] sites: {sites}'])
|
||||
|
||||
for site in UNIFI_SITES:
|
||||
|
||||
mylog('verbose', [f'[{pluginName}] site: {site}'])
|
||||
|
||||
c = Controller(UNIFI_HOST, UNIFI_USERNAME, UNIFI_PASSWORD, port=PORT, version=VERSION, ssl_verify=VERIFYSSL, site_id=site)
|
||||
|
||||
mylog('verbose', [f'[UNFIMP] Identify Unifi Devices'])
|
||||
# get all Unifi devices
|
||||
for ap in c.get_aps():
|
||||
|
||||
# mylog('verbose', [f'{json.dumps(ap)}'])
|
||||
|
||||
deviceType = ''
|
||||
if (ap['type'] == 'udm'):
|
||||
deviceType = 'Router'
|
||||
elif (ap['type'] == 'usg'):
|
||||
deviceType = 'Router'
|
||||
elif (ap['type'] == 'usw'):
|
||||
deviceType = 'Switch'
|
||||
elif (ap['type'] == 'uap'):
|
||||
deviceType = 'AP'
|
||||
|
||||
name = get_unifi_val(ap, 'name')
|
||||
hostName = get_unifi_val(ap, 'hostname')
|
||||
|
||||
name = set_name(name, hostName)
|
||||
|
||||
ipTmp = get_unifi_val(ap, 'ip')
|
||||
|
||||
# if IP not found use a default value
|
||||
if ipTmp == "null":
|
||||
ipTmp = '0.0.0.0'
|
||||
|
||||
plugin_objects.add_object(
|
||||
primaryId=ap['mac'],
|
||||
secondaryId=ipTmp,
|
||||
watched1=name,
|
||||
watched2='Ubiquiti Networks Inc.',
|
||||
watched3=deviceType,
|
||||
watched4=ap['state'],
|
||||
extra=get_unifi_val(ap, 'connection_network_name')
|
||||
)
|
||||
|
||||
|
||||
mylog('verbose', [f'[UNFIMP] Found {len(plugin_objects)} Unifi Devices'])
|
||||
|
||||
|
||||
online_macs = set()
|
||||
processed_macs = []
|
||||
|
||||
# get_clients() returns all clients which are currently online.
|
||||
for cl in c.get_clients():
|
||||
mylog('verbose', [f'[{pluginName}] Get Online Devices'])
|
||||
|
||||
# mylog('verbose', [f'{json.dumps(cl)}'])
|
||||
online_macs.add(cl['mac'])
|
||||
# Collect details for online clients
|
||||
collect_details(
|
||||
device_type={'cl': ''},
|
||||
devices=c.get_clients(),
|
||||
online_macs=online_macs,
|
||||
processed_macs=processed_macs,
|
||||
plugin_objects=plugin_objects,
|
||||
device_label='client',
|
||||
device_vendor=""
|
||||
)
|
||||
|
||||
|
||||
mylog('verbose', [f'[UNFIMP] Found {len(plugin_objects)} Online Devices'])
|
||||
mylog('verbose', [f'[{pluginName}] Found {len(plugin_objects)} Online Devices'])
|
||||
|
||||
# get_users() returns all clients known by the controller
|
||||
for user in c.get_users():
|
||||
mylog('verbose', [f'[{pluginName}] Identify Unifi Devices'])
|
||||
|
||||
#mylog('verbose', [f'{json.dumps(user)}'])
|
||||
# Collect details for Unifi devices
|
||||
collect_details(
|
||||
device_type={
|
||||
'udm': 'Router',
|
||||
'usg': 'Router',
|
||||
'usw': 'Switch',
|
||||
'uap': 'AP'
|
||||
},
|
||||
devices=c.get_aps(),
|
||||
online_macs=online_macs,
|
||||
processed_macs=processed_macs,
|
||||
plugin_objects=plugin_objects,
|
||||
device_label='ap',
|
||||
device_vendor="Ubiquiti Networks Inc."
|
||||
)
|
||||
|
||||
name = get_unifi_val(user, 'name')
|
||||
hostName = get_unifi_val(user, 'hostname')
|
||||
mylog('verbose', [f'[{pluginName}] Found {len(plugin_objects)} Unifi Devices'])
|
||||
|
||||
name = set_name(name, hostName)
|
||||
# Collect details for users
|
||||
collect_details(
|
||||
device_type={'user': ''},
|
||||
devices=c.get_users(),
|
||||
online_macs=online_macs,
|
||||
processed_macs=processed_macs,
|
||||
plugin_objects=plugin_objects,
|
||||
device_label='user',
|
||||
device_vendor=""
|
||||
)
|
||||
|
||||
status = 1 if user['mac'] in online_macs else 0
|
||||
mylog('verbose', [f'[{pluginName}] Found {len(plugin_objects)} Users'])
|
||||
|
||||
if status == 1 or perform_full_run is True:
|
||||
|
||||
ipTmp = get_unifi_val(user, 'last_ip')
|
||||
|
||||
if ipTmp == 'null':
|
||||
ipTmp = get_unifi_val(user, 'fixed_ip')
|
||||
|
||||
# if IP not found use a default value
|
||||
if ipTmp == "null":
|
||||
ipTmp = '0.0.0.0'
|
||||
|
||||
plugin_objects.add_object(
|
||||
primaryId=user['mac'],
|
||||
secondaryId=ipTmp,
|
||||
watched1=name,
|
||||
watched2=get_unifi_val(user, 'oui'),
|
||||
watched3='Other',
|
||||
watched4=status,
|
||||
extra=get_unifi_val(user, 'last_connection_network_name')
|
||||
)
|
||||
|
||||
# check if the lockfile needs to be adapted
|
||||
|
||||
mylog('verbose', [f'[UNFIMP] check if Lock file needs to be modified'])
|
||||
|
||||
mylog('verbose', [f'[{pluginName}] check if Lock file needs to be modified'])
|
||||
set_lock_file_value(FULL_IMPORT, lock_file_value)
|
||||
|
||||
|
||||
mylog('verbose', [f'[UNFIMP] Found {len(plugin_objects)} Clients overall'])
|
||||
mylog('verbose', [f'[{pluginName}] Found {len(plugin_objects)} Clients overall'])
|
||||
|
||||
return plugin_objects
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def get_unifi_val(obj, key):
|
||||
def collect_details(device_type, devices, online_macs, processed_macs, plugin_objects, device_label, device_vendor):
|
||||
for device in devices:
|
||||
mylog('verbose', [f'{json.dumps(device)}'])
|
||||
|
||||
res = ''
|
||||
name = get_name(get_unifi_val(device, 'name'), get_unifi_val(device, 'hostname'))
|
||||
ipTmp = get_ip(get_unifi_val(device, 'last_ip'), get_unifi_val(device, 'fixed_ip'), get_unifi_val(device, 'ip'))
|
||||
macTmp = device['mac']
|
||||
status = 1 if macTmp in online_macs else device.get('state', 0)
|
||||
deviceType = device_type.get(device.get('type'), '')
|
||||
|
||||
res = obj.get(key, None)
|
||||
|
||||
if res not in ['','None', None]:
|
||||
return res
|
||||
|
||||
mylog('debug', [f'[{pluginName}] Value not found for key "{key}" in obj "{json.dumps(obj)}"'])
|
||||
|
||||
return 'null'
|
||||
# Add object only if not processed
|
||||
if macTmp not in processed_macs:
|
||||
plugin_objects.add_object(
|
||||
primaryId=macTmp,
|
||||
secondaryId=ipTmp,
|
||||
watched1=name,
|
||||
watched2=get_unifi_val(device, 'oui', device_vendor),
|
||||
watched3=deviceType,
|
||||
watched4=status,
|
||||
extra=get_unifi_val(device, 'connection_network_name', ''),
|
||||
foreignKey="",
|
||||
helpVal1=get_parent_mac(get_unifi_val(device, 'uplink_mac'), get_unifi_val(device, 'ap_mac'), get_unifi_val(device, 'sw_mac')),
|
||||
helpVal2=get_port(get_unifi_val(device, 'sw_port'), get_unifi_val(device, 'uplink_remote_port')),
|
||||
helpVal3=device_label,
|
||||
helpVal4="",
|
||||
)
|
||||
processed_macs.append(macTmp)
|
||||
# -----------------------------------------------------------------------------
|
||||
def get_unifi_val(obj, key, default='null'):
|
||||
if isinstance(obj, dict):
|
||||
if key in obj and obj[key] not in ['', 'None', None]:
|
||||
return obj[key]
|
||||
for k, v in obj.items():
|
||||
if isinstance(v, dict):
|
||||
result = get_unifi_val(v, key, default)
|
||||
if result not in ['','None', None, 'null']:
|
||||
return result
|
||||
|
||||
mylog('debug', [f'[{pluginName}] Value not found for key "{key}" in obj "{json.dumps(obj)}"'])
|
||||
return default
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def set_name(name: str, hostName: str) -> str:
|
||||
def get_name(*names: str) -> str:
|
||||
for name in names:
|
||||
if name and name != 'null':
|
||||
return name
|
||||
return 'null'
|
||||
|
||||
if name != 'null':
|
||||
return name
|
||||
# -----------------------------------------------------------------------------
|
||||
def get_parent_mac(*macs: str) -> str:
|
||||
for mac in macs:
|
||||
if mac and mac != 'null':
|
||||
return mac
|
||||
return 'null'
|
||||
|
||||
elif name == 'null' and hostName != 'null':
|
||||
return hostName
|
||||
# -----------------------------------------------------------------------------
|
||||
def get_port(*ports: str) -> str:
|
||||
for port in ports:
|
||||
if port and port != 'null':
|
||||
return port
|
||||
return 'null'
|
||||
|
||||
else:
|
||||
return 'null'
|
||||
# -----------------------------------------------------------------------------
|
||||
def get_port(*macs: str) -> str:
|
||||
for mac in macs:
|
||||
if mac and mac != 'null':
|
||||
return mac
|
||||
return 'null'
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def get_ip(*ips: str) -> str:
|
||||
for ip in ips:
|
||||
if ip and ip != 'null':
|
||||
return ip
|
||||
return '0:0:0:0'
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def set_lock_file_value(config_value: str, lock_file_value: bool) -> None:
|
||||
|
||||
mylog('verbose', [f'[UNFIMP] Lock Params: config_value={config_value}, lock_file_value={lock_file_value}'])
|
||||
mylog('verbose', [f'[{pluginName}] Lock Params: config_value={config_value}, lock_file_value={lock_file_value}'])
|
||||
# set lock if 'once' is set and the lock is not set
|
||||
if config_value == 'once' and lock_file_value is False:
|
||||
out = 1
|
||||
@@ -251,10 +243,10 @@ def set_lock_file_value(config_value: str, lock_file_value: bool) -> None:
|
||||
elif config_value != 'once' and lock_file_value is True:
|
||||
out = 0
|
||||
else:
|
||||
mylog('verbose', [f'[UNFIMP] No change on lock file needed'])
|
||||
mylog('verbose', [f'[{pluginName}] No change on lock file needed'])
|
||||
return
|
||||
|
||||
mylog('verbose', [f'[UNFIMP] Setting lock value for "full import" to {out}'])
|
||||
mylog('verbose', [f'[{pluginName}] Setting lock value for "full import" to {out}'])
|
||||
with open(LOCK_FILE, 'w') as lock_file:
|
||||
lock_file.write(str(out))
|
||||
|
||||
@@ -272,10 +264,10 @@ def read_lock_file() -> bool:
|
||||
# -----------------------------------------------------------------------------
|
||||
def check_full_run_state(config_value: str, lock_file_value: bool) -> bool:
|
||||
if config_value == 'always' or (config_value == 'once' and lock_file_value == False):
|
||||
mylog('verbose', [f'[UNFIMP] Full import needs to be done: config_value: {config_value} and lock_file_value: {lock_file_value}'])
|
||||
mylog('verbose', [f'[{pluginName}] Full import needs to be done: config_value: {config_value} and lock_file_value: {lock_file_value}'])
|
||||
return True
|
||||
else:
|
||||
mylog('verbose', [f'[UNFIMP] Full import NOT needed: config_value: {config_value} and lock_file_value: {lock_file_value}'])
|
||||
mylog('verbose', [f'[{pluginName}] Full import NOT needed: config_value: {config_value} and lock_file_value: {lock_file_value}'])
|
||||
return False
|
||||
|
||||
#===============================================================================
|
||||
|
||||
Reference in New Issue
Block a user