mirror of
https://github.com/jokob-sk/NetAlertX.git
synced 2025-12-07 09:36:05 -08:00
Merge pull request #442 from tuhriel/unifi_full_import
Added functionality to perform a full import
This commit is contained in:
1386
front/plugins/unifi_import/config.json
Executable file → Normal file
1386
front/plugins/unifi_import/config.json
Executable file → Normal file
File diff suppressed because it is too large
Load Diff
72
front/plugins/unifi_import/script.py
Executable file → Normal file
72
front/plugins/unifi_import/script.py
Executable file → Normal file
@@ -2,7 +2,7 @@
|
||||
# Inspired by https://github.com/stevehoek/Pi.Alert
|
||||
|
||||
# Example call
|
||||
# python3 /home/pi/pialert/front/plugins/unifi_import/script.py username=pialert password=passw0rd host=192.168.1.1 site=default protocol=https port=8443 version='UDMP-unifiOS'
|
||||
# python3 /home/pi/pialert/front/plugins/unifi_import/script.py username=pialert password=passw0rd host=192.168.1.1 site=default protocol=https port=443 verifyssl=false version='UDMP-unifiOS'
|
||||
# python3 /home/pi/pialert/front/plugins/unifi_import/script.py username=pialert password=passw0rd host=192.168.1.1 sites=sdefault port=8443 verifyssl=false version=v5
|
||||
|
||||
from __future__ import unicode_literals
|
||||
@@ -28,6 +28,7 @@ from logger import mylog
|
||||
CUR_PATH = str(pathlib.Path(__file__).parent.resolve())
|
||||
LOG_FILE = os.path.join(CUR_PATH, 'script.log')
|
||||
RESULT_FILE = os.path.join(CUR_PATH, 'last_result.log')
|
||||
LOCK_FILE = os.path.join(CUR_PATH, 'full_run.lock')
|
||||
|
||||
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
|
||||
|
||||
@@ -39,7 +40,7 @@ def main():
|
||||
|
||||
|
||||
# init global variables
|
||||
global UNIFI_USERNAME, UNIFI_PASSWORD, UNIFI_HOST, UNIFI_SITES, PORT, VERIFYSSL, VERSION
|
||||
global UNIFI_USERNAME, UNIFI_PASSWORD, UNIFI_HOST, UNIFI_SITES, PORT, VERIFYSSL, VERSION, FULL_IMPORT
|
||||
|
||||
|
||||
parser = argparse.ArgumentParser(description='Import devices from a UNIFI controller')
|
||||
@@ -51,9 +52,13 @@ def main():
|
||||
parser.add_argument('port', action="store", help="Usually 8443")
|
||||
parser.add_argument('verifyssl', action="store", help="verify SSL certificate [true|false]")
|
||||
parser.add_argument('version', action="store", help="The base version of the controller API [v4|v5|unifiOS|UDMP-unifiOS]")
|
||||
parser.add_argument('fullimport', action="store", help="Defines if a full import or only online devices hould be imported [disabled|once|always]")
|
||||
|
||||
values = parser.parse_args()
|
||||
|
||||
|
||||
|
||||
|
||||
# parse output
|
||||
plugin_objects = Plugin_Objects(RESULT_FILE)
|
||||
|
||||
@@ -69,6 +74,7 @@ def main():
|
||||
PORT = values.port.split('=')[1]
|
||||
VERIFYSSL = values.verifyssl.split('=')[1]
|
||||
VERSION = values.version.split('=')[1]
|
||||
FULL_IMPORT = values.fullimport.split('=')[1]
|
||||
|
||||
plugin_objects = get_entries(plugin_objects)
|
||||
|
||||
@@ -79,9 +85,14 @@ def main():
|
||||
|
||||
# .............................................
|
||||
|
||||
def get_entries(plugin_objects):
|
||||
def get_entries(plugin_objects: Plugin_Objects) -> Plugin_Objects:
|
||||
global VERIFYSSL
|
||||
|
||||
# check if the full run must be run:
|
||||
lock_file_value = read_lock_file()
|
||||
perform_full_run = check_full_run_state(FULL_IMPORT, lock_file_value)
|
||||
|
||||
|
||||
sites = []
|
||||
|
||||
if ',' in UNIFI_SITES:
|
||||
@@ -148,7 +159,7 @@ def get_entries(plugin_objects):
|
||||
# get_users() returns all clients known by the controller
|
||||
for user in c.get_users():
|
||||
|
||||
mylog('verbose', [f'{json.dumps(user)}'])
|
||||
#mylog('verbose', [f'{json.dumps(user)}'])
|
||||
|
||||
name = get_unifi_val(user, 'name')
|
||||
hostName = get_unifi_val(user, 'hostname')
|
||||
@@ -157,7 +168,7 @@ def get_entries(plugin_objects):
|
||||
|
||||
status = 1 if user['mac'] in online_macs else 0
|
||||
|
||||
if status == 1:
|
||||
if status == 1 or perform_full_run is True:
|
||||
|
||||
ipTmp = get_unifi_val(user, 'last_ip')
|
||||
|
||||
@@ -174,11 +185,17 @@ def get_entries(plugin_objects):
|
||||
extra=get_unifi_val(user, 'last_connection_network_name')
|
||||
)
|
||||
|
||||
|
||||
# check if the lockfile needs to be adapted
|
||||
|
||||
mylog('verbose', [f'[UNFIMP] check if Lock file needs to be modified'])
|
||||
set_lock_file_value(FULL_IMPORT, lock_file_value)
|
||||
|
||||
|
||||
mylog('verbose', [f'[UNFIMP] Found {len(plugin_objects)} Clients overall'])
|
||||
|
||||
return plugin_objects
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def get_unifi_val(obj, key):
|
||||
|
||||
@@ -192,8 +209,8 @@ def get_unifi_val(obj, key):
|
||||
|
||||
return 'null'
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def set_name(name: str, hostName: str) -> str:
|
||||
|
||||
if name != 'null':
|
||||
@@ -205,8 +222,47 @@ def set_name(name: str, hostName: str) -> str:
|
||||
else:
|
||||
return 'null'
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def set_lock_file_value(config_value: str, lock_file_value: bool) -> None:
|
||||
|
||||
mylog('verbose', [f'[UNFIMP] Lock Params: config_value={config_value}, lock_file_value={lock_file_value}'])
|
||||
# set lock if 'once' is set and the lock is not set
|
||||
if config_value == 'once' and lock_file_value is False:
|
||||
out = 1
|
||||
# reset lock if not 'once' is set and the lock is present
|
||||
elif config_value != 'once' and lock_file_value is True:
|
||||
out = 0
|
||||
else:
|
||||
mylog('verbose', [f'[UNFIMP] No change on lock file needed'])
|
||||
return
|
||||
|
||||
mylog('verbose', [f'[UNFIMP] Setting lock value for "full import" to {out}'])
|
||||
with open(LOCK_FILE, 'w') as lock_file:
|
||||
lock_file.write(str(out))
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def read_lock_file() -> bool:
|
||||
|
||||
try:
|
||||
with open(LOCK_FILE, 'r') as lock_file:
|
||||
return bool(int(lock_file.readline()))
|
||||
except (FileNotFoundError, ValueError):
|
||||
return False
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def check_full_run_state(config_value: str, lock_file_value: bool) -> bool:
|
||||
if config_value == 'always' or (config_value == 'once' and lock_file_value == False):
|
||||
mylog('verbose', [f'[UNFIMP] Full import needs to be done: config_value: {config_value} and lock_file_value: {lock_file_value}'])
|
||||
return True
|
||||
else:
|
||||
mylog('verbose', [f'[UNFIMP] Full import NOT needed: config_value: {config_value} and lock_file_value: {lock_file_value}'])
|
||||
return False
|
||||
|
||||
#===============================================================================
|
||||
# BEGIN
|
||||
#===============================================================================
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
main()
|
||||
|
||||
Reference in New Issue
Block a user