mirror of
https://github.com/jokob-sk/NetAlertX.git
synced 2025-12-07 09:36:05 -08:00
245 lines
7.7 KiB
Python
Executable File
245 lines
7.7 KiB
Python
Executable File
#!/usr/bin/env python
|
|
# Based on the work of https://github.com/stevehoek/Pi.Alert
|
|
|
|
# Example call
|
|
# python3 /home/pi/pialert/front/plugins/unifi_import/script.py username=pialert password=passw0rd host=192.168.1.1 site=default protocol=https:// port=8443
|
|
|
|
from __future__ import unicode_literals
|
|
from time import sleep, time, strftime
|
|
import requests
|
|
from requests import Request, Session, packages
|
|
import pathlib
|
|
import threading
|
|
import subprocess
|
|
import socket
|
|
import json
|
|
import argparse
|
|
import io
|
|
import sys
|
|
from requests.packages.urllib3.exceptions import InsecureRequestWarning
|
|
import pwd
|
|
import os
|
|
from unificontrol import UnifiClient
|
|
from pyunifi.controller import Controller
|
|
|
|
|
|
curPath = str(pathlib.Path(__file__).parent.resolve())
|
|
log_file = curPath + '/script.log'
|
|
last_run = curPath + '/last_result.log'
|
|
|
|
# Workflow
|
|
|
|
def main():
|
|
|
|
# init global variables
|
|
global UNIFI_USERNAME, UNIFI_PASSWORD, UNIFI_HOST
|
|
global UNIFI_SITES, PORT, PROTOCOL
|
|
|
|
last_run_logfile = open(last_run, 'a')
|
|
|
|
# empty file
|
|
last_run_logfile.write("")
|
|
|
|
parser = argparse.ArgumentParser(description='Import devices from an UNIFI controller')
|
|
|
|
parser.add_argument('username', action="store", help="Username used to login into the UNIFI controller")
|
|
parser.add_argument('password', action="store", help="Password used to login into the UNIFI controller")
|
|
parser.add_argument('host', action="store", help="Host url or IP address where the UNIFI controller is hosted (excluding http://)")
|
|
parser.add_argument('sites', action="store", help="Name of the sites (usually 'default', check the URL in your UniFi controller UI). Separated by comma (,) if passing multiple sites")
|
|
parser.add_argument('protocol', action="store", help="https:// or http://")
|
|
parser.add_argument('port', action="store", help="Usually 8443")
|
|
|
|
values = parser.parse_args()
|
|
|
|
# parse output
|
|
newEntries = []
|
|
|
|
if values.username and values.password and values.host and values.sites:
|
|
|
|
UNIFI_USERNAME = values.username.split('=')[1]
|
|
UNIFI_PASSWORD = values.password.split('=')[1]
|
|
UNIFI_HOST = values.host.split('=')[1]
|
|
UNIFI_SITES = values.sites.split('=')[1]
|
|
PROTOCOL = values.protocol.split('=')[1]
|
|
PORT = values.port.split('=')[1]
|
|
|
|
newEntries = get_entries(newEntries)
|
|
|
|
|
|
for e in newEntries:
|
|
# Insert list into the log
|
|
service_monitoring_log(e.primaryId, e.secondaryId, e.created, e.watched1, e.watched2, e.watched3, e.watched4, e.extra, e.foreignKey )
|
|
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
def get_entries(newEntries):
|
|
|
|
sites = []
|
|
|
|
if ',' in UNIFI_SITES:
|
|
sites = UNIFI_SITES.split(',')
|
|
|
|
else:
|
|
sites.append(UNIFI_SITES)
|
|
|
|
|
|
for site in sites:
|
|
|
|
c = Controller(UNIFI_HOST, UNIFI_USERNAME, UNIFI_PASSWORD, ssl_verify=False, site_id=site )
|
|
|
|
for ap in c.get_aps():
|
|
|
|
# print(f'{json.dumps(ap)}')
|
|
|
|
deviceType = ''
|
|
if (ap['type'] == 'udm'):
|
|
deviceType = 'Router'
|
|
elif (ap['type'] == 'usg'):
|
|
deviceType = 'Router'
|
|
elif (ap['type'] == 'usw'):
|
|
deviceType = 'Switch'
|
|
elif (ap['type'] == 'uap'):
|
|
deviceType = 'AP'
|
|
|
|
name = get_unifi_val(ap, 'name')
|
|
hostName = get_unifi_val(ap, 'hostname')
|
|
|
|
if name == 'null' and hostName != 'null':
|
|
name = hostName
|
|
|
|
tmpPlugObj = plugin_object_class(
|
|
ap['mac'],
|
|
ap['ip'],
|
|
name,
|
|
'Ubiquiti Networks Inc.',
|
|
deviceType,
|
|
ap['state'],
|
|
get_unifi_val(ap, 'connection_network_name')
|
|
)
|
|
|
|
newEntries.append(tmpPlugObj)
|
|
|
|
# print(f'>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
|
|
|
|
for cl in c.get_clients():
|
|
|
|
# print(f'{json.dumps(cl)}')
|
|
|
|
name = get_unifi_val(cl, 'name')
|
|
hostName = get_unifi_val(cl, 'hostname')
|
|
|
|
if name == 'null' and hostName != 'null':
|
|
name = hostName
|
|
|
|
tmpPlugObj = plugin_object_class(
|
|
cl['mac'],
|
|
cl['ip'],
|
|
name,
|
|
get_unifi_val(cl, 'oui'),
|
|
'Other',
|
|
1,
|
|
get_unifi_val(cl, 'connection_network_name')
|
|
)
|
|
|
|
# print(f'>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
|
|
|
|
for us in c.get_clients():
|
|
|
|
# print(f'{json.dumps(us)}')
|
|
|
|
name = get_unifi_val(us, 'name')
|
|
hostName = get_unifi_val(us, 'hostname')
|
|
|
|
if name == 'null' and hostName != 'null':
|
|
name = hostName
|
|
|
|
tmpPlugObj = plugin_object_class(
|
|
us['mac'],
|
|
us['ip'],
|
|
name,
|
|
get_unifi_val(us, 'oui'),
|
|
'Other',
|
|
1,
|
|
get_unifi_val(us, 'connection_network_name')
|
|
)
|
|
|
|
newEntries.append(tmpPlugObj)
|
|
|
|
return newEntries
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
def get_unifi_val(obj, key):
|
|
|
|
res = ''
|
|
|
|
if key in obj:
|
|
res = obj[key]
|
|
|
|
if res not in ['','None']:
|
|
return res
|
|
|
|
if obj.get(key) is not None:
|
|
res = obj.get(key)
|
|
|
|
if res not in ['','None']:
|
|
return res
|
|
|
|
return 'null'
|
|
|
|
# -------------------------------------------------------------------
|
|
class plugin_object_class:
|
|
def __init__(self, primaryId = '',secondaryId = '', watched1 = '',watched2 = '',watched3 = '',watched4 = '',extra = '',foreignKey = ''):
|
|
self.pluginPref = ''
|
|
self.primaryId = primaryId
|
|
self.secondaryId = secondaryId
|
|
self.created = strftime("%Y-%m-%d %H:%M:%S")
|
|
self.changed = ''
|
|
self.watched1 = watched1
|
|
self.watched2 = watched2
|
|
self.watched3 = watched3
|
|
self.watched4 = watched4
|
|
self.status = ''
|
|
self.extra = extra
|
|
self.userData = ''
|
|
self.foreignKey = foreignKey
|
|
|
|
# -----------------------------------------------------------------------------
|
|
def service_monitoring_log(primaryId, secondaryId, created, watched1, watched2 = 'null', watched3 = 'null', watched4 = 'null', extra ='null', foreignKey ='null' ):
|
|
|
|
if watched1 == '':
|
|
watched1 = 'null'
|
|
if watched2 == '':
|
|
watched2 = 'null'
|
|
if watched3 == '':
|
|
watched3 = 'null'
|
|
if watched4 == '':
|
|
watched4 = 'null'
|
|
if extra == '':
|
|
extra = 'null'
|
|
if foreignKey == '':
|
|
foreignKey = 'null'
|
|
|
|
with open(last_run, 'a') as last_run_logfile:
|
|
last_run_logfile.write("{}|{}|{}|{}|{}|{}|{}|{}|{}\n".format(
|
|
primaryId,
|
|
secondaryId,
|
|
created,
|
|
watched1,
|
|
watched2,
|
|
watched3,
|
|
watched4,
|
|
extra,
|
|
foreignKey
|
|
)
|
|
)
|
|
|
|
|
|
#===============================================================================
|
|
# BEGIN
|
|
#===============================================================================
|
|
if __name__ == '__main__':
|
|
main()
|
|
|