split publishers

This commit is contained in:
Data-Monkey
2023-05-29 16:35:22 +10:00
parent f50e3d4e92
commit 5b05be24ad
11 changed files with 534 additions and 643 deletions

View File

@@ -13,7 +13,7 @@ import time
from pathlib import Path from pathlib import Path
import requests import requests
import conf import conf
from const import * from const import *
from logger import mylog, logResult from logger import mylog, logResult
@@ -27,29 +27,29 @@ def timeNowTZ():
return datetime.datetime.now(conf.tz).replace(microsecond=0) return datetime.datetime.now(conf.tz).replace(microsecond=0)
#------------------------------------------------------------------------------- #-------------------------------------------------------------------------------
def updateState(db, newState): def updateState(db, newState):
# ?? Why is the state written to the DB? # ?? Why is the state written to the DB?
#sql = db.sql #sql = db.sql
mylog('debug', '[updateState] changing state to: "' + newState +'"') mylog('debug', '[updateState] changing state to: "' + newState +'"')
db.sql.execute ("UPDATE Parameters SET par_Value='"+ newState +"' WHERE par_ID='Back_App_State'") db.sql.execute ("UPDATE Parameters SET par_Value='"+ newState +"' WHERE par_ID='Back_App_State'")
db.commitDB() db.commitDB()
#------------------------------------------------------------------------------- #-------------------------------------------------------------------------------
def updateSubnets(scan_subnets): def updateSubnets(scan_subnets):
subnets = [] subnets = []
# multiple interfaces # multiple interfaces
if type(scan_subnets) is list: if type(scan_subnets) is list:
for interface in scan_subnets : for interface in scan_subnets :
subnets.append(interface) subnets.append(interface)
# one interface only # one interface only
else: else:
subnets.append(scan_subnets) subnets.append(scan_subnets)
return subnets return subnets
@@ -57,7 +57,7 @@ def updateSubnets(scan_subnets):
# check RW access of DB and config file # check RW access of DB and config file
def checkPermissionsOK(): def checkPermissionsOK():
#global confR_access, confW_access, dbR_access, dbW_access #global confR_access, confW_access, dbR_access, dbW_access
confR_access = (os.access(fullConfPath, os.R_OK)) confR_access = (os.access(fullConfPath, os.R_OK))
confW_access = (os.access(fullConfPath, os.W_OK)) confW_access = (os.access(fullConfPath, os.W_OK))
dbR_access = (os.access(fullDbPath, os.R_OK)) dbR_access = (os.access(fullDbPath, os.R_OK))
@@ -72,14 +72,14 @@ def checkPermissionsOK():
mylog('none', [ " " , dbPath , " | " , " WRITE | " , dbW_access]) mylog('none', [ " " , dbPath , " | " , " WRITE | " , dbW_access])
mylog('none', ['------------------------------------------------']) mylog('none', ['------------------------------------------------'])
#return dbR_access and dbW_access and confR_access and confW_access #return dbR_access and dbW_access and confR_access and confW_access
return (confR_access, dbR_access) return (confR_access, dbR_access)
#------------------------------------------------------------------------------- #-------------------------------------------------------------------------------
def fixPermissions(): def fixPermissions():
# Try fixing access rights if needed # Try fixing access rights if needed
chmodCommands = [] chmodCommands = []
chmodCommands.append(['sudo', 'chmod', 'a+rw', '-R', fullDbPath]) chmodCommands.append(['sudo', 'chmod', 'a+rw', '-R', fullDbPath])
chmodCommands.append(['sudo', 'chmod', 'a+rw', '-R', fullConfPath]) chmodCommands.append(['sudo', 'chmod', 'a+rw', '-R', fullConfPath])
for com in chmodCommands: for com in chmodCommands:
@@ -90,7 +90,7 @@ def fixPermissions():
result = subprocess.check_output (com, universal_newlines=True) result = subprocess.check_output (com, universal_newlines=True)
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
# An error occured, handle it # An error occured, handle it
mylog('none', ["[Setup] Fix Failed. Execute this command manually inside of the container: ", ' '.join(com)]) mylog('none', ["[Setup] Fix Failed. Execute this command manually inside of the container: ", ' '.join(com)])
mylog('none', [e.output]) mylog('none', [e.output])
@@ -111,7 +111,7 @@ def initialiseFile(pathToCheck, defaultFile):
# write stdout and stderr into .log files for debugging if needed # write stdout and stderr into .log files for debugging if needed
logResult (stdout, stderr) # TO-DO should be changed to mylog logResult (stdout, stderr) # TO-DO should be changed to mylog
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
# An error occured, handle it # An error occured, handle it
mylog('none', ["[Setup] Error copying ("+defaultFile+"). Make sure the app has Read & Write access to " + pathToCheck]) mylog('none', ["[Setup] Error copying ("+defaultFile+"). Make sure the app has Read & Write access to " + pathToCheck])
@@ -130,7 +130,7 @@ def filePermissions():
initialiseFile(fullDbPath, "/home/pi/pialert/back/pialert.db_bak") initialiseFile(fullDbPath, "/home/pi/pialert/back/pialert.db_bak")
# last attempt # last attempt
fixPermissions() fixPermissions()
#------------------------------------------------------------------------------- #-------------------------------------------------------------------------------
@@ -139,7 +139,7 @@ def bytes_to_string(value):
# if value is of type bytes, convert to string # if value is of type bytes, convert to string
if isinstance(value, bytes): if isinstance(value, bytes):
value = value.decode('utf-8') value = value.decode('utf-8')
return value return value
#------------------------------------------------------------------------------- #-------------------------------------------------------------------------------
@@ -152,21 +152,15 @@ def if_byte_then_to_str(input):
#------------------------------------------------------------------------------- #-------------------------------------------------------------------------------
def collect_lang_strings(db, json, pref): def collect_lang_strings(db, json, pref):
for prop in json["localized"]: for prop in json["localized"]:
for language_string in json[prop]: for language_string in json[prop]:
import_language_string(db, language_string["language_code"], pref + "_" + prop, language_string["string"]) import_language_string(db, language_string["language_code"], pref + "_" + prop, language_string["string"])
#------------------------------------------------------------------------------- #-------------------------------------------------------------------------------
# Creates a JSON object from a DB row # Creates a JSON object from a DB row
def row_to_json(names, row): def row_to_json(names, row):
rowEntry = {} rowEntry = {}
index = 0 index = 0
@@ -179,7 +173,7 @@ def row_to_json(names, row):
#------------------------------------------------------------------------------- #-------------------------------------------------------------------------------
def import_language_string(db, code, key, value, extra = ""): def import_language_string(db, code, key, value, extra = ""):
db.sql.execute ("""INSERT INTO Plugins_Language_Strings ("Language_Code", "String_Key", "String_Value", "Extra") VALUES (?, ?, ?, ?)""", (str(code), str(key), str(value), str(extra))) db.sql.execute ("""INSERT INTO Plugins_Language_Strings ("Language_Code", "String_Key", "String_Value", "Extra") VALUES (?, ?, ?, ?)""", (str(code), str(key), str(value), str(extra)))
db.commitDB() db.commitDB()
@@ -198,13 +192,13 @@ def checkIPV4(ip):
#------------------------------------------------------------------------------- #-------------------------------------------------------------------------------
def isNewVersion(newVersion: bool): def isNewVersion(newVersion: bool):
if newVersion == False: if newVersion == False:
f = open(pialertPath + '/front/buildtimestamp.txt', 'r') f = open(pialertPath + '/front/buildtimestamp.txt', 'r')
buildTimestamp = int(f.read().strip()) buildTimestamp = int(f.read().strip())
f.close() f.close()
data = "" data = ""
@@ -213,19 +207,19 @@ def isNewVersion(newVersion: bool):
text = url.text text = url.text
data = json.loads(text) data = json.loads(text)
except requests.exceptions.ConnectionError as e: except requests.exceptions.ConnectionError as e:
mylog('info', [" Couldn't check for new release."]) mylog('info', [" Couldn't check for new release."])
data = "" data = ""
# make sure we received a valid response and not an API rate limit exceeded message # make sure we received a valid response and not an API rate limit exceeded message
if data != "" and len(data) > 0 and isinstance(data, list) and "published_at" in data[0]: if data != "" and len(data) > 0 and isinstance(data, list) and "published_at" in data[0]:
dateTimeStr = data[0]["published_at"] dateTimeStr = data[0]["published_at"]
realeaseTimestamp = int(datetime.datetime.strptime(dateTimeStr, '%Y-%m-%dT%H:%M:%SZ').strftime('%s')) realeaseTimestamp = int(datetime.datetime.strptime(dateTimeStr, '%Y-%m-%dT%H:%M:%SZ').strftime('%s'))
if realeaseTimestamp > buildTimestamp + 600: if realeaseTimestamp > buildTimestamp + 600:
mylog('none', [" New version of the container available!"]) mylog('none', [" New version of the container available!"])
newVersion = True newVersion = True
# updateState(db, 'Back_New_Version_Available', str(newVersionAvailable)) ## TO DO add this back in but avoid circular ref with database # updateState(db, 'Back_New_Version_Available', str(newVersionAvailable)) ## TO DO add this back in but avoid circular ref with database
return newVersion return newVersion
@@ -237,7 +231,7 @@ def hide_email(email):
if len(m) == 2: if len(m) == 2:
return f'{m[0][0]}{"*"*(len(m[0])-2)}{m[0][-1] if len(m[0]) > 1 else ""}@{m[1]}' return f'{m[0][0]}{"*"*(len(m[0])-2)}{m[0][-1] if len(m[0]) > 1 else ""}@{m[1]}'
return email return email
#------------------------------------------------------------------------------- #-------------------------------------------------------------------------------
def removeDuplicateNewLines(text): def removeDuplicateNewLines(text):
@@ -250,14 +244,14 @@ def removeDuplicateNewLines(text):
def add_json_list (row, list): def add_json_list (row, list):
new_row = [] new_row = []
for column in row : for column in row :
column = bytes_to_string(column) column = bytes_to_string(column)
new_row.append(column) new_row.append(column)
list.append(new_row) list.append(new_row)
return list return list
#------------------------------------------------------------------------------- #-------------------------------------------------------------------------------
@@ -275,7 +269,7 @@ def generate_mac_links (html, deviceUrl):
MACs = re.findall(p, html) MACs = re.findall(p, html)
for mac in MACs: for mac in MACs:
html = html.replace('<td>' + mac + '</td>','<td><a href="' + deviceUrl + mac + '">' + mac + '</a></td>') html = html.replace('<td>' + mac + '</td>','<td><a href="' + deviceUrl + mac + '">' + mac + '</a></td>')
return html return html
@@ -283,40 +277,47 @@ def generate_mac_links (html, deviceUrl):
#------------------------------------------------------------------------------- #-------------------------------------------------------------------------------
def initOrSetParam(db, parID, parValue): def initOrSetParam(db, parID, parValue):
sql = db.sql sql = db.sql
sql.execute ("INSERT INTO Parameters(par_ID, par_Value) VALUES('"+str(parID)+"', '"+str(parValue)+"') ON CONFLICT(par_ID) DO UPDATE SET par_Value='"+str(parValue)+"' where par_ID='"+str(parID)+"'") sql.execute ("INSERT INTO Parameters(par_ID, par_Value) VALUES('"+str(parID)+"', '"+str(parValue)+"') ON CONFLICT(par_ID) DO UPDATE SET par_Value='"+str(parValue)+"' where par_ID='"+str(parID)+"'")
db.commitDB() db.commitDB()
#------------------------------------------------------------------------------- #-------------------------------------------------------------------------------
class json_struc: class json_struc:
def __init__(self, jsn, columnNames): def __init__(self, jsn, columnNames):
self.json = jsn self.json = jsn
self.columnNames = columnNames self.columnNames = columnNames
#------------------------------------------------------------------------------- #-------------------------------------------------------------------------------
def get_file_content(path): def get_file_content(path):
f = open(path, 'r') f = open(path, 'r')
content = f.read() content = f.read()
f.close() f.close()
return content return content
#------------------------------------------------------------------------------- #-------------------------------------------------------------------------------
def write_file (pPath, pText): def write_file (pPath, pText):
# Write the text depending using the correct python version # Write the text depending using the correct python version
if sys.version_info < (3, 0): if sys.version_info < (3, 0):
file = io.open (pPath , mode='w', encoding='utf-8') file = io.open (pPath , mode='w', encoding='utf-8')
file.write ( pText.decode('unicode_escape') ) file.write ( pText.decode('unicode_escape') )
file.close() file.close()
else: else:
file = open (pPath, 'w', encoding='utf-8') file = open (pPath, 'w', encoding='utf-8')
if pText is None: if pText is None:
pText = "" pText = ""
file.write (pText) file.write (pText)
file.close() file.close()
#-------------------------------------------------------------------------------
class noti_struc:
def __init__(self, json, text, html):
self.json = json
self.text = text
self.html = html

View File

@@ -1,244 +0,0 @@
import time
import re
from paho.mqtt import client as mqtt_client
import conf
from logger import mylog
from database import get_all_devices, get_device_stats
from helper import bytes_to_string, sanitize_string
#-------------------------------------------------------------------------------
# MQTT
#-------------------------------------------------------------------------------
mqtt_connected_to_broker = False
mqtt_sensors = []
#-------------------------------------------------------------------------------
class sensor_config:
def __init__(self, deviceId, deviceName, sensorType, sensorName, icon):
self.deviceId = deviceId
self.deviceName = deviceName
self.sensorType = sensorType
self.sensorName = sensorName
self.icon = icon
self.hash = str(hash(str(deviceId) + str(deviceName)+ str(sensorType)+ str(sensorName)+ str(icon)))
#-------------------------------------------------------------------------------
def publish_mqtt(client, topic, message):
status = 1
while status != 0:
result = client.publish(
topic=topic,
payload=message,
qos=conf.MQTT_QOS,
retain=True,
)
status = result[0]
if status != 0:
mylog('info', ["Waiting to reconnect to MQTT broker"])
time.sleep(0.1)
return True
#-------------------------------------------------------------------------------
def create_generic_device(client):
deviceName = 'PiAlert'
deviceId = 'pialert'
create_sensor(client, deviceId, deviceName, 'sensor', 'online', 'wifi-check')
create_sensor(client, deviceId, deviceName, 'sensor', 'down', 'wifi-cancel')
create_sensor(client, deviceId, deviceName, 'sensor', 'all', 'wifi')
create_sensor(client, deviceId, deviceName, 'sensor', 'archived', 'wifi-lock')
create_sensor(client, deviceId, deviceName, 'sensor', 'new', 'wifi-plus')
create_sensor(client, deviceId, deviceName, 'sensor', 'unknown', 'wifi-alert')
#-------------------------------------------------------------------------------
def create_sensor(client, deviceId, deviceName, sensorType, sensorName, icon):
new_sensor_config = sensor_config(deviceId, deviceName, sensorType, sensorName, icon)
# check if config already in list and if not, add it, otherwise skip
global mqtt_sensors, uniqueSensorCount
is_unique = True
for sensor in mqtt_sensors:
if sensor.hash == new_sensor_config.hash:
is_unique = False
break
# save if unique
if is_unique:
publish_sensor(client, new_sensor_config)
#-------------------------------------------------------------------------------
def publish_sensor(client, sensorConf):
global mqtt_sensors
message = '{ \
"name":"'+ sensorConf.deviceName +' '+sensorConf.sensorName+'", \
"state_topic":"system-sensors/'+sensorConf.sensorType+'/'+sensorConf.deviceId+'/state", \
"value_template":"{{value_json.'+sensorConf.sensorName+'}}", \
"unique_id":"'+sensorConf.deviceId+'_sensor_'+sensorConf.sensorName+'", \
"device": \
{ \
"identifiers": ["'+sensorConf.deviceId+'_sensor"], \
"manufacturer": "PiAlert", \
"name":"'+sensorConf.deviceName+'" \
}, \
"icon":"mdi:'+sensorConf.icon+'" \
}'
topic='homeassistant/'+sensorConf.sensorType+'/'+sensorConf.deviceId+'/'+sensorConf.sensorName+'/config'
# add the sensor to the global list to keep track of succesfully added sensors
if publish_mqtt(client, topic, message):
# hack - delay adding to the queue in case the process is
time.sleep(conf.MQTT_DELAY_SEC) # restarted and previous publish processes aborted
# (it takes ~2s to update a sensor config on the broker)
mqtt_sensors.append(sensorConf)
#-------------------------------------------------------------------------------
def mqtt_create_client():
def on_disconnect(client, userdata, rc):
global mqtt_connected_to_broker
mqtt_connected_to_broker = False
# not sure is below line is correct / necessary
# client = mqtt_create_client()
def on_connect(client, userdata, flags, rc):
global mqtt_connected_to_broker
if rc == 0:
mylog('verbose', [" Connected to broker"])
mqtt_connected_to_broker = True # Signal connection
else:
mylog('none', [" Connection failed"])
mqtt_connected_to_broker = False
client = mqtt_client.Client('PiAlert') # Set Connecting Client ID
client.username_pw_set(conf.MQTT_USER, conf.MQTT_PASSWORD)
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.connect(conf.MQTT_BROKER, conf.MQTT_PORT)
client.loop_start()
return client
#-------------------------------------------------------------------------------
def mqtt_start():
global client, mqtt_connected_to_broker
if mqtt_connected_to_broker == False:
mqtt_connected_to_broker = True
client = mqtt_create_client()
# General stats
# Create a generic device for overal stats
create_generic_device(client)
# Get the data
row = get_device_stats()
columns = ["online","down","all","archived","new","unknown"]
payload = ""
# Update the values
for column in columns:
payload += '"'+column+'": ' + str(row[column]) +','
# Publish (warap into {} and remove last ',' from above)
publish_mqtt(client, "system-sensors/sensor/pialert/state",
'{ \
'+ payload[:-1] +'\
}'
)
# Specific devices
# Get all devices
devices = get_all_devices()
sec_delay = len(devices) * int(conf.MQTT_DELAY_SEC)*5
mylog('info', [" Estimated delay: ", (sec_delay), 's ', '(', round(sec_delay/60,1) , 'min)' ])
for device in devices:
# Create devices in Home Assistant - send config messages
deviceId = 'mac_' + device["dev_MAC"].replace(" ", "").replace(":", "_").lower()
deviceNameDisplay = re.sub('[^a-zA-Z0-9-_\s]', '', device["dev_Name"])
create_sensor(client, deviceId, deviceNameDisplay, 'sensor', 'last_ip', 'ip-network')
create_sensor(client, deviceId, deviceNameDisplay, 'binary_sensor', 'is_present', 'wifi')
create_sensor(client, deviceId, deviceNameDisplay, 'sensor', 'mac_address', 'folder-key-network')
create_sensor(client, deviceId, deviceNameDisplay, 'sensor', 'is_new', 'bell-alert-outline')
create_sensor(client, deviceId, deviceNameDisplay, 'sensor', 'vendor', 'cog')
# update device sensors in home assistant
publish_mqtt(client, 'system-sensors/sensor/'+deviceId+'/state',
'{ \
"last_ip": "' + device["dev_LastIP"] +'", \
"is_new": "' + str(device["dev_NewDevice"]) +'", \
"vendor": "' + sanitize_string(device["dev_Vendor"]) +'", \
"mac_address": "' + str(device["dev_MAC"]) +'" \
}'
)
publish_mqtt(client, 'system-sensors/binary_sensor/'+deviceId+'/state',
'{ \
"is_present": "' + to_binary_sensor(str(device["dev_PresentLastScan"])) +'"\
}'
)
# delete device / topic
# homeassistant/sensor/mac_44_ef_bf_c4_b1_af/is_present/config
# client.publish(
# topic="homeassistant/sensor/"+deviceId+"/is_present/config",
# payload="",
# qos=1,
# retain=True,
# )
# time.sleep(10)
#===============================================================================
# Home Assistant UTILs
#===============================================================================
def to_binary_sensor(input):
# In HA a binary sensor returns ON or OFF
result = "OFF"
# bytestring
if isinstance(input, str):
if input == "1":
result = "ON"
elif isinstance(input, int):
if input == 1:
result = "ON"
elif isinstance(input, bool):
if input == True:
result = "ON"
elif isinstance(input, bytes):
if bytes_to_string(input) == "1":
result = "ON"
return result

View File

@@ -0,0 +1,8 @@
""" Publishers for Pi.Alert """
"""
each publisher exposes:
- check_config () returning True / False
- send (message) returning True / Fasle
"""

View File

@@ -0,0 +1,42 @@
import json
import subprocess
import conf
from helper import noti_struc
from logger import logResult, mylog
#-------------------------------------------------------------------------------
def check_config():
if conf.APPRISE_URL == '' or conf.APPRISE_HOST == '':
mylog('none', ['[Check Config] Error: Apprise service not set up correctly. Check your pialert.conf APPRISE_* variables.'])
return False
else:
return True
#-------------------------------------------------------------------------------
def send (msg: noti_struc):
html = msg.html
text = msg.text
#Define Apprise compatible payload (https://github.com/caronc/apprise-api#stateless-solution)
payload = html
if conf.APPRISE_PAYLOAD == 'text':
payload = text
_json_payload={
"urls": conf.APPRISE_URL,
"title": "Pi.Alert Notifications",
"format": conf.APPRISE_PAYLOAD,
"body": payload
}
try:
# try runnning a subprocess
p = subprocess.Popen(["curl","-i","-X", "POST" ,"-H", "Content-Type:application/json" ,"-d", json.dumps(_json_payload), conf.APPRISE_HOST], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
stdout, stderr = p.communicate()
# write stdout and stderr into .log files for debugging if needed
logResult (stdout, stderr) # TO-DO should be changed to mylog
except subprocess.CalledProcessError as e:
# An error occured, handle it
mylog('none', [e.output])

View File

@@ -0,0 +1,88 @@
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import smtplib
import conf
from helper import hide_email, noti_struc
from logger import mylog, print_log
#-------------------------------------------------------------------------------
def check_config ():
if conf.SMTP_SERVER == '' or conf.REPORT_FROM == '' or conf.REPORT_TO == '':
mylog('none', ['[Email Check Config] Error: Email service not set up correctly. Check your pialert.conf SMTP_*, REPORT_FROM and REPORT_TO variables.'])
return False
else:
return True
#-------------------------------------------------------------------------------
def send (msg: noti_struc):
pText = msg.text
pHTML = msg.html
mylog('debug', '[Send Email] REPORT_TO: ' + hide_email(str(conf.REPORT_TO)) + ' SMTP_USER: ' + hide_email(str(conf.SMTP_USER)))
# Compose email
msg = MIMEMultipart('alternative')
msg['Subject'] = 'Pi.Alert Report'
msg['From'] = conf.REPORT_FROM
msg['To'] = conf.REPORT_TO
msg.attach (MIMEText (pText, 'plain'))
msg.attach (MIMEText (pHTML, 'html'))
failedAt = ''
failedAt = print_log ('SMTP try')
try:
# Send mail
failedAt = print_log('Trying to open connection to ' + str(conf.SMTP_SERVER) + ':' + str(conf.SMTP_PORT))
if conf.SMTP_FORCE_SSL:
failedAt = print_log('SMTP_FORCE_SSL == True so using .SMTP_SSL()')
if conf.SMTP_PORT == 0:
failedAt = print_log('SMTP_PORT == 0 so sending .SMTP_SSL(SMTP_SERVER)')
smtp_connection = smtplib.SMTP_SSL(conf.SMTP_SERVER)
else:
failedAt = print_log('SMTP_PORT == 0 so sending .SMTP_SSL(SMTP_SERVER, SMTP_PORT)')
smtp_connection = smtplib.SMTP_SSL(conf.SMTP_SERVER, conf.SMTP_PORT)
else:
failedAt = print_log('SMTP_FORCE_SSL == False so using .SMTP()')
if conf.SMTP_PORT == 0:
failedAt = print_log('SMTP_PORT == 0 so sending .SMTP(SMTP_SERVER)')
smtp_connection = smtplib.SMTP (conf.SMTP_SERVER)
else:
failedAt = print_log('SMTP_PORT == 0 so sending .SMTP(SMTP_SERVER, SMTP_PORT)')
smtp_connection = smtplib.SMTP (conf.SMTP_SERVER, conf.SMTP_PORT)
failedAt = print_log('Setting SMTP debug level')
# Log level set to debug of the communication between SMTP server and client
if conf.LOG_LEVEL == 'debug':
smtp_connection.set_debuglevel(1)
failedAt = print_log( 'Sending .ehlo()')
smtp_connection.ehlo()
if not conf.SMTP_SKIP_TLS:
failedAt = print_log('SMTP_SKIP_TLS == False so sending .starttls()')
smtp_connection.starttls()
failedAt = print_log('SMTP_SKIP_TLS == False so sending .ehlo()')
smtp_connection.ehlo()
if not conf.SMTP_SKIP_LOGIN:
failedAt = print_log('SMTP_SKIP_LOGIN == False so sending .login()')
smtp_connection.login (conf.SMTP_USER, conf.SMTP_PASS)
failedAt = print_log('Sending .sendmail()')
smtp_connection.sendmail (conf.REPORT_FROM, conf.REPORT_TO, msg.as_string())
smtp_connection.quit()
except smtplib.SMTPAuthenticationError as e:
mylog('none', [' ERROR: Failed at - ', failedAt])
mylog('none', [' ERROR: Couldn\'t connect to the SMTP server (SMTPAuthenticationError), skipping Email (enable LOG_LEVEL=debug for more logging)'])
except smtplib.SMTPServerDisconnected as e:
mylog('none', [' ERROR: Failed at - ', failedAt])
mylog('none', [' ERROR: Couldn\'t connect to the SMTP server (SMTPServerDisconnected), skipping Email (enable LOG_LEVEL=debug for more logging)'])
mylog('debug', '[Send Email] Last executed - ' + str(failedAt))

View File

@@ -0,0 +1,36 @@
import conf
import requests
from base64 import b64encode
from logger import mylog, noti_struc
#-------------------------------------------------------------------------------
def check_config():
if conf.NTFY_HOST == '' or conf.NTFY_TOPIC == '':
mylog('none', ['[Check Config] Error: NTFY service not set up correctly. Check your pialert.conf NTFY_* variables.'])
return False
else:
return True
#-------------------------------------------------------------------------------
def send (msg: noti_struc):
_Text = msg.html
headers = {
"Title": "Pi.Alert Notification",
"Actions": "view, Open Dashboard, "+ conf.REPORT_DASHBOARD_URL,
"Priority": "urgent",
"Tags": "warning"
}
# if username and password are set generate hash and update header
if conf.NTFY_USER != "" and conf.NTFY_PASSWORD != "":
# Generate hash for basic auth
# usernamepassword = "{}:{}".format(conf.NTFY_USER,conf.NTFY_PASSWORD)
basichash = b64encode(bytes(conf.NTFY_USER + ':' + conf.NTFY_PASSWORD, "utf-8")).decode("ascii")
# add authorization header with hash
headers["Authorization"] = "Basic {}".format(basichash)
requests.post("{}/{}".format( conf.NTFY_HOST, conf.NTFY_TOPIC),
data=_Text,
headers=headers)

View File

@@ -0,0 +1,33 @@
import requests
import conf
from helper import noti_struc
from logger import mylog
#-------------------------------------------------------------------------------
def check_config():
if conf.PUSHSAFER_TOKEN == 'ApiKey':
mylog('none', ['[Check Config] Error: Pushsafer service not set up correctly. Check your pialert.conf PUSHSAFER_TOKEN variable.'])
return False
else:
return True
#-------------------------------------------------------------------------------
def send ( msg:noti_struc ):
_Text = msg.text
url = 'https://www.pushsafer.com/api'
post_fields = {
"t" : 'Pi.Alert Message',
"m" : _Text,
"s" : 11,
"v" : 3,
"i" : 148,
"c" : '#ef7f7f',
"d" : 'a',
"u" : conf.REPORT_DASHBOARD_URL,
"ut" : 'Open Pi.Alert',
"k" : conf.PUSHSAFER_TOKEN,
}
requests.post(url, data=post_fields)

View File

@@ -0,0 +1,98 @@
import json
import subprocess
import conf
from const import logPath
from helper import noti_struc, write_file
from logger import logResult, mylog
#-------------------------------------------------------------------------------
def check_config():
if conf.WEBHOOK_URL == '':
mylog('none', ['[Check Config] Error: Webhook service not set up correctly. Check your pialert.conf WEBHOOK_* variables.'])
return False
else:
return True
#-------------------------------------------------------------------------------
def send_webhook (msg: noti_struc):
# use data type based on specified payload type
if conf.WEBHOOK_PAYLOAD == 'json':
payloadData = msg.json
if conf.WEBHOOK_PAYLOAD == 'html':
payloadData = msg.html
if conf.WEBHOOK_PAYLOAD == 'text':
payloadData = to_text(msg.json) # TO DO can we just send msg.text?
# Define slack-compatible payload
_json_payload = { "text": payloadData } if conf.WEBHOOK_PAYLOAD == 'text' else {
"username": "Pi.Alert",
"text": "There are new notifications",
"attachments": [{
"title": "Pi.Alert Notifications",
"title_link": conf.REPORT_DASHBOARD_URL,
"text": payloadData
}]
}
# DEBUG - Write the json payload into a log file for debugging
write_file (logPath + '/webhook_payload.json', json.dumps(_json_payload))
# Using the Slack-Compatible Webhook endpoint for Discord so that the same payload can be used for both
if(conf.WEBHOOK_URL.startswith('https://discord.com/api/webhooks/') and not conf.WEBHOOK_URL.endswith("/slack")):
_WEBHOOK_URL = f"{conf.WEBHOOK_URL}/slack"
curlParams = ["curl","-i","-H", "Content-Type:application/json" ,"-d", json.dumps(_json_payload), _WEBHOOK_URL]
else:
_WEBHOOK_URL = conf.WEBHOOK_URL
curlParams = ["curl","-i","-X", conf.WEBHOOK_REQUEST_METHOD ,"-H", "Content-Type:application/json" ,"-d", json.dumps(_json_payload), _WEBHOOK_URL]
# execute CURL call
try:
# try runnning a subprocess
mylog('debug', '[send_webhook] curlParams: '+ curlParams)
p = subprocess.Popen(curlParams, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
stdout, stderr = p.communicate()
# write stdout and stderr into .log files for debugging if needed
logResult (stdout, stderr) # TO-DO should be changed to mylog
except subprocess.CalledProcessError as e:
# An error occured, handle it
mylog('none', ['[send_webhook]', e.output])
#-------------------------------------------------------------------------------
def to_text(_json):
payloadData = ""
if len(_json['internet']) > 0 and 'internet' in conf.INCLUDED_SECTIONS:
payloadData += "INTERNET\n"
for event in _json['internet']:
payloadData += event[3] + ' on ' + event[2] + '. ' + event[4] + '. New address:' + event[1] + '\n'
if len(_json['new_devices']) > 0 and 'new_devices' in conf.INCLUDED_SECTIONS:
payloadData += "NEW DEVICES:\n"
for event in _json['new_devices']:
if event[4] is None:
event[4] = event[11]
payloadData += event[1] + ' - ' + event[4] + '\n'
if len(_json['down_devices']) > 0 and 'down_devices' in conf.INCLUDED_SECTIONS:
write_file (logPath + '/down_devices_example.log', _json['down_devices'])
payloadData += 'DOWN DEVICES:\n'
for event in _json['down_devices']:
if event[4] is None:
event[4] = event[11]
payloadData += event[1] + ' - ' + event[4] + '\n'
if len(_json['events']) > 0 and 'events' in conf.INCLUDED_SECTIONS:
payloadData += "EVENTS:\n"
for event in _json['events']:
if event[8] != "Internet":
payloadData += event[8] + " on " + event[1] + " " + event[3] + " at " + event[2] + "\n"
return payloadData

View File

@@ -1,40 +1,41 @@
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import datetime import datetime
import json import json
import smtplib
import socket import socket
from base64 import b64encode
import subprocess import subprocess
import requests import requests
from json2table import convert from json2table import convert
# pialert modules # pialert modules
import conf import conf
from const import pialertPath, logPath, apiPath from const import pialertPath, logPath, apiPath
from helper import generate_mac_links, removeDuplicateNewLines, timeNow, hide_email, json_struc, updateState, get_file_content, write_file from helper import noti_struc, generate_mac_links, removeDuplicateNewLines, timeNow, hide_email, updateState, get_file_content, write_file
from logger import logResult, mylog, print_log from logger import logResult, mylog, print_log
from mqtt import mqtt_start
from publishers.email import (check_config as email_check_config,
send as send_email )
from publishers.ntfy import (check_config as ntfy_check_config,
send as send_ntfy )
from publishers.apprise import (check_config as apprise_check_config,
send as send_apprise)
from publishers.webhook import (check_config as webhook_check_config,
send as send_webhook)
from publishers.pushsafer import (check_config as pushsafer_check_config,
send as send_pushsafer)
from publishers.mqtt import (check_config as mqtt_check_config,
mqtt_start )
#=============================================================================== #===============================================================================
# REPORTING # REPORTING
#=============================================================================== #===============================================================================
# create a json for webhook and mqtt notifications to provide further integration options # create a json for webhook and mqtt notifications to provide further integration options
json_final = [] json_final = []
#-------------------------------------------------------------------------------
class noti_struc:
def __init__(self, json, text, html):
self.json = json
self.text = text
self.html = html
#------------------------------------------------------------------------------- #-------------------------------------------------------------------------------
def construct_notifications(db, sqlQuery, tableTitle, skipText = False, suppliedJsonStruct = None): def construct_notifications(db, sqlQuery, tableTitle, skipText = False, suppliedJsonStruct = None):
@@ -55,7 +56,7 @@ def construct_notifications(db, sqlQuery, tableTitle, skipText = False, supplied
json_struc = suppliedJsonStruct json_struc = suppliedJsonStruct
jsn = json_struc.json jsn = json_struc.json
html = "" html = ""
text = "" text = ""
if len(jsn["data"]) > 0: if len(jsn["data"]) > 0:
@@ -68,13 +69,13 @@ def construct_notifications(db, sqlQuery, tableTitle, skipText = False, supplied
# prepare text-only message # prepare text-only message
if skipText == False: if skipText == False:
for device in jsn["data"]: for device in jsn["data"]:
for header in headers: for header in headers:
padding = "" padding = ""
if len(header) < 4: if len(header) < 4:
padding = "\t" padding = "\t"
text += text_line.format ( header + ': ' + padding, device[header]) text += text_line.format ( header + ': ' + padding, device[header])
text += '\n' text += '\n'
# Format HTML table headers # Format HTML table headers
@@ -86,7 +87,8 @@ def construct_notifications(db, sqlQuery, tableTitle, skipText = False, supplied
def send_notifications (db): def send_notifications (db, INCLUDED_SECTIONS = conf.INCLUDED_SECTIONS):
sql = db.sql #TO-DO sql = db.sql #TO-DO
global mail_text, mail_html, json_final, changedPorts_json_struc, partial_html, partial_txt, partial_json global mail_text, mail_html, json_final, changedPorts_json_struc, partial_html, partial_txt, partial_json
@@ -94,7 +96,7 @@ def send_notifications (db):
plugins_report = False plugins_report = False
# Reporting section # Reporting section
mylog('verbose', ['[Notification] Check if something to report']) mylog('verbose', ['[Notification] Check if something to report'])
# prepare variables for JSON construction # prepare variables for JSON construction
json_internet = [] json_internet = []
@@ -108,26 +110,26 @@ def send_notifications (db):
sql.execute ("""UPDATE Events SET eve_PendingAlertEmail = 0 sql.execute ("""UPDATE Events SET eve_PendingAlertEmail = 0
WHERE eve_PendingAlertEmail = 1 AND eve_EventType != 'Device Down' AND eve_MAC IN WHERE eve_PendingAlertEmail = 1 AND eve_EventType != 'Device Down' AND eve_MAC IN
( (
SELECT dev_MAC FROM Devices WHERE dev_AlertEvents = 0 SELECT dev_MAC FROM Devices WHERE dev_AlertEvents = 0
)""") )""")
sql.execute ("""UPDATE Events SET eve_PendingAlertEmail = 0 sql.execute ("""UPDATE Events SET eve_PendingAlertEmail = 0
WHERE eve_PendingAlertEmail = 1 AND eve_EventType = 'Device Down' AND eve_MAC IN WHERE eve_PendingAlertEmail = 1 AND eve_EventType = 'Device Down' AND eve_MAC IN
( (
SELECT dev_MAC FROM Devices WHERE dev_AlertDeviceDown = 0 SELECT dev_MAC FROM Devices WHERE dev_AlertDeviceDown = 0
)""") )""")
# Open text Template # Open text Template
template_file = open(pialertPath + '/back/report_template.txt', 'r') template_file = open(pialertPath + '/back/report_template.txt', 'r')
mail_text = template_file.read() mail_text = template_file.read()
template_file.close() template_file.close()
# Open html Template # Open html Template
template_file = open(pialertPath + '/back/report_template.html', 'r') template_file = open(pialertPath + '/back/report_template.html', 'r')
if conf.newVersionAvailable : if conf.newVersionAvailable :
template_file = open(pialertPath + '/back/report_template_new_version.html', 'r') template_file = open(pialertPath + '/back/report_template_new_version.html', 'r')
mail_html = template_file.read() mail_html = template_file.read()
template_file.close() template_file.close()
# Report Header & footer # Report Header & footer
timeFormated = timeNow().strftime ('%Y-%m-%d %H:%M') timeFormated = timeNow().strftime ('%Y-%m-%d %H:%M')
@@ -137,7 +139,7 @@ def send_notifications (db):
mail_text = mail_text.replace ('<SERVER_NAME>', socket.gethostname() ) mail_text = mail_text.replace ('<SERVER_NAME>', socket.gethostname() )
mail_html = mail_html.replace ('<SERVER_NAME>', socket.gethostname() ) mail_html = mail_html.replace ('<SERVER_NAME>', socket.gethostname() )
if 'internet' in conf.INCLUDED_SECTIONS: if 'internet' in INCLUDED_SECTIONS:
# Compose Internet Section # Compose Internet Section
sqlQuery = """SELECT eve_MAC as MAC, eve_IP as IP, eve_DateTime as Datetime, eve_EventType as "Event Type", eve_AdditionalInfo as "More info" FROM Events sqlQuery = """SELECT eve_MAC as MAC, eve_IP as IP, eve_DateTime as Datetime, eve_EventType as "Event Type", eve_AdditionalInfo as "More info" FROM Events
WHERE eve_PendingAlertEmail = 1 AND eve_MAC = 'Internet' WHERE eve_PendingAlertEmail = 1 AND eve_MAC = 'Internet'
@@ -145,14 +147,14 @@ def send_notifications (db):
notiStruc = construct_notifications(db, sqlQuery, "Internet IP change") notiStruc = construct_notifications(db, sqlQuery, "Internet IP change")
# collect "internet" (IP changes) for the webhook json # collect "internet" (IP changes) for the webhook json
json_internet = notiStruc.json["data"] json_internet = notiStruc.json["data"]
mail_text = mail_text.replace ('<SECTION_INTERNET>', notiStruc.text + '\n') mail_text = mail_text.replace ('<SECTION_INTERNET>', notiStruc.text + '\n')
mail_html = mail_html.replace ('<INTERNET_TABLE>', notiStruc.html) mail_html = mail_html.replace ('<INTERNET_TABLE>', notiStruc.html)
if 'new_devices' in conf.INCLUDED_SECTIONS: if 'new_devices' in INCLUDED_SECTIONS:
# Compose New Devices Section # Compose New Devices Section
sqlQuery = """SELECT eve_MAC as MAC, eve_DateTime as Datetime, dev_LastIP as IP, eve_EventType as "Event Type", dev_Name as "Device name", dev_Comments as Comments FROM Events_Devices sqlQuery = """SELECT eve_MAC as MAC, eve_DateTime as Datetime, dev_LastIP as IP, eve_EventType as "Event Type", dev_Name as "Device name", dev_Comments as Comments FROM Events_Devices
WHERE eve_PendingAlertEmail = 1 WHERE eve_PendingAlertEmail = 1
AND eve_EventType = 'New Device' AND eve_EventType = 'New Device'
@@ -160,14 +162,14 @@ def send_notifications (db):
notiStruc = construct_notifications(db, sqlQuery, "New devices") notiStruc = construct_notifications(db, sqlQuery, "New devices")
# collect "new_devices" for the webhook json # collect "new_devices" for the webhook json
json_new_devices = notiStruc.json["data"] json_new_devices = notiStruc.json["data"]
mail_text = mail_text.replace ('<SECTION_NEW_DEVICES>', notiStruc.text + '\n') mail_text = mail_text.replace ('<SECTION_NEW_DEVICES>', notiStruc.text + '\n')
mail_html = mail_html.replace ('<NEW_DEVICES_TABLE>', notiStruc.html) mail_html = mail_html.replace ('<NEW_DEVICES_TABLE>', notiStruc.html)
if 'down_devices' in conf.INCLUDED_SECTIONS: if 'down_devices' in INCLUDED_SECTIONS:
# Compose Devices Down Section # Compose Devices Down Section
sqlQuery = """SELECT eve_MAC as MAC, eve_DateTime as Datetime, dev_LastIP as IP, eve_EventType as "Event Type", dev_Name as "Device name", dev_Comments as Comments FROM Events_Devices sqlQuery = """SELECT eve_MAC as MAC, eve_DateTime as Datetime, dev_LastIP as IP, eve_EventType as "Event Type", dev_Name as "Device name", dev_Comments as Comments FROM Events_Devices
WHERE eve_PendingAlertEmail = 1 WHERE eve_PendingAlertEmail = 1
AND eve_EventType = 'Device Down' AND eve_EventType = 'Device Down'
@@ -175,14 +177,14 @@ def send_notifications (db):
notiStruc = construct_notifications(db, sqlQuery, "Down devices") notiStruc = construct_notifications(db, sqlQuery, "Down devices")
# collect "new_devices" for the webhook json # collect "new_devices" for the webhook json
json_down_devices = notiStruc.json["data"] json_down_devices = notiStruc.json["data"]
mail_text = mail_text.replace ('<SECTION_DEVICES_DOWN>', notiStruc.text + '\n') mail_text = mail_text.replace ('<SECTION_DEVICES_DOWN>', notiStruc.text + '\n')
mail_html = mail_html.replace ('<DOWN_DEVICES_TABLE>', notiStruc.html) mail_html = mail_html.replace ('<DOWN_DEVICES_TABLE>', notiStruc.html)
if 'events' in conf.INCLUDED_SECTIONS: if 'events' in INCLUDED_SECTIONS:
# Compose Events Section # Compose Events Section
sqlQuery = """SELECT eve_MAC as MAC, eve_DateTime as Datetime, dev_LastIP as IP, eve_EventType as "Event Type", dev_Name as "Device name", dev_Comments as Comments FROM Events_Devices sqlQuery = """SELECT eve_MAC as MAC, eve_DateTime as Datetime, dev_LastIP as IP, eve_EventType as "Event Type", dev_Name as "Device name", dev_Comments as Comments FROM Events_Devices
WHERE eve_PendingAlertEmail = 1 WHERE eve_PendingAlertEmail = 1
AND eve_EventType IN ('Connected','Disconnected', AND eve_EventType IN ('Connected','Disconnected',
@@ -191,16 +193,16 @@ def send_notifications (db):
notiStruc = construct_notifications(db, sqlQuery, "Events") notiStruc = construct_notifications(db, sqlQuery, "Events")
# collect "events" for the webhook json # collect "events" for the webhook json
json_events = notiStruc.json["data"] json_events = notiStruc.json["data"]
mail_text = mail_text.replace ('<SECTION_EVENTS>', notiStruc.text + '\n') mail_text = mail_text.replace ('<SECTION_EVENTS>', notiStruc.text + '\n')
mail_html = mail_html.replace ('<EVENTS_TABLE>', notiStruc.html) mail_html = mail_html.replace ('<EVENTS_TABLE>', notiStruc.html)
if 'ports' in conf.INCLUDED_SECTIONS: if 'ports' in INCLUDED_SECTIONS:
# collect "ports" for the webhook json # collect "ports" for the webhook json
if changedPorts_json_struc is not None: if changedPorts_json_struc is not None:
json_ports = changedPorts_json_struc.json["data"] json_ports = changedPorts_json_struc.json["data"]
notiStruc = construct_notifications(db, "", "Ports", True, changedPorts_json_struc) notiStruc = construct_notifications(db, "", "Ports", True, changedPorts_json_struc)
@@ -208,17 +210,17 @@ def send_notifications (db):
portsTxt = "" portsTxt = ""
if changedPorts_json_struc is not None: if changedPorts_json_struc is not None:
portsTxt = "Ports \n---------\n Ports changed! Check PiAlert for details!\n" portsTxt = "Ports \n---------\n Ports changed! Check PiAlert for details!\n"
mail_text = mail_text.replace ('<PORTS_TABLE>', portsTxt ) mail_text = mail_text.replace ('<PORTS_TABLE>', portsTxt )
if 'plugins' in conf.INCLUDED_SECTIONS and conf.ENABLE_PLUGINS: if 'plugins' in INCLUDED_SECTIONS and conf.ENABLE_PLUGINS:
# Compose Plugins Section # Compose Plugins Section
sqlQuery = """SELECT Plugin, Object_PrimaryId, Object_SecondaryId, DateTimeChanged, Watched_Value1, Watched_Value2, Watched_Value3, Watched_Value4, Status from Plugins_Events""" sqlQuery = """SELECT Plugin, Object_PrimaryId, Object_SecondaryId, DateTimeChanged, Watched_Value1, Watched_Value2, Watched_Value3, Watched_Value4, Status from Plugins_Events"""
notiStruc = construct_notifications(db, sqlQuery, "Plugins") notiStruc = construct_notifications(db, sqlQuery, "Plugins")
# collect "plugins" for the webhook json # collect "plugins" for the webhook json
json_plugins = notiStruc.json["data"] json_plugins = notiStruc.json["data"]
mail_text = mail_text.replace ('<PLUGINS_TABLE>', notiStruc.text + '\n') mail_text = mail_text.replace ('<PLUGINS_TABLE>', notiStruc.text + '\n')
@@ -229,42 +231,44 @@ def send_notifications (db):
json_final = { json_final = {
"internet": json_internet, "internet": json_internet,
"new_devices": json_new_devices, "new_devices": json_new_devices,
"down_devices": json_down_devices, "down_devices": json_down_devices,
"events": json_events, "events": json_events,
"ports": json_ports, "ports": json_ports,
"plugins": json_plugins, "plugins": json_plugins,
} }
mail_text = removeDuplicateNewLines(mail_text) mail_text = removeDuplicateNewLines(mail_text)
# Create clickable MAC links # Create clickable MAC links
mail_html = generate_mac_links (mail_html, deviceUrl) mail_html = generate_mac_links (mail_html, deviceUrl)
# Write output emails for debug # Write output emails for debug
write_file (logPath + '/report_output.json', json.dumps(json_final)) write_file (logPath + '/report_output.json', json.dumps(json_final))
write_file (logPath + '/report_output.txt', mail_text) write_file (logPath + '/report_output.txt', mail_text)
write_file (logPath + '/report_output.html', mail_html) write_file (logPath + '/report_output.html', mail_html)
# Send Mail # Send Mail
if json_internet != [] or json_new_devices != [] or json_down_devices != [] or json_events != [] or json_ports != [] or conf.debug_force_notification or plugins_report: if json_internet != [] or json_new_devices != [] or json_down_devices != [] or json_events != [] or json_ports != [] or conf.debug_force_notification or plugins_report:
mylog('none', ['[Notification] Changes detected, sending reports']) mylog('none', ['[Notification] Changes detected, sending reports'])
msg = noti_struc(json_final, mail_text, mail_html)
mylog('info', ['[Notification] Udateing API files']) mylog('info', ['[Notification] Udateing API files'])
send_api() send_api()
if conf.REPORT_MAIL and check_config('email'): if conf.REPORT_MAIL and check_config('email'):
updateState(db,"Send: Email") updateState(db,"Send: Email")
mylog('info', ['[Notification] Sending report by Email']) mylog('info', ['[Notification] Sending report by Email'])
send_email (mail_text, mail_html) send_email (msg )
else : else :
mylog('verbose', ['[Notification] Skip email']) mylog('verbose', ['[Notification] Skip email'])
if conf.REPORT_APPRISE and check_config('apprise'): if conf.REPORT_APPRISE and check_config('apprise'):
updateState(db,"Send: Apprise") updateState(db,"Send: Apprise")
mylog('info', ['[Notification] Sending report by Apprise']) mylog('info', ['[Notification] Sending report by Apprise'])
send_apprise (mail_html, mail_text) send_apprise (msg)
else : else :
mylog('verbose', ['[Notification] Skip Apprise']) mylog('verbose', ['[Notification] Skip Apprise'])
if conf.REPORT_WEBHOOK and check_config('webhook'): if conf.REPORT_WEBHOOK and check_config('webhook'):
@@ -276,20 +280,20 @@ def send_notifications (db):
if conf.REPORT_NTFY and check_config('ntfy'): if conf.REPORT_NTFY and check_config('ntfy'):
updateState(db,"Send: NTFY") updateState(db,"Send: NTFY")
mylog('info', ['[Notification] Sending report by NTFY']) mylog('info', ['[Notification] Sending report by NTFY'])
send_ntfy (mail_text) send_ntfy (msg)
else : else :
mylog('verbose', ['[Notification] Skip NTFY']) mylog('verbose', ['[Notification] Skip NTFY'])
if conf.REPORT_PUSHSAFER and check_config('pushsafer'): if conf.REPORT_PUSHSAFER and check_config('pushsafer'):
updateState(db,"Send: PUSHSAFER") updateState(db,"Send: PUSHSAFER")
mylog('info', ['[Notification] Sending report by PUSHSAFER']) mylog('info', ['[Notification] Sending report by PUSHSAFER'])
send_pushsafer (mail_text) send_pushsafer (msg)
else : else :
mylog('verbose', ['[Notification] Skip PUSHSAFER']) mylog('verbose', ['[Notification] Skip PUSHSAFER'])
# Update MQTT entities # Update MQTT entities
if conf.REPORT_MQTT and check_config('mqtt'): if conf.REPORT_MQTT and check_config('mqtt'):
updateState(db,"Send: MQTT") updateState(db,"Send: MQTT")
mylog('info', ['[Notification] Establishing MQTT thread']) mylog('info', ['[Notification] Establishing MQTT thread'])
mqtt_start() mqtt_start()
else : else :
mylog('verbose', ['[Notification] Skip MQTT']) mylog('verbose', ['[Notification] Skip MQTT'])
else : else :
@@ -305,13 +309,13 @@ def send_notifications (db):
# clear plugin events # clear plugin events
sql.execute ("DELETE FROM Plugins_Events") sql.execute ("DELETE FROM Plugins_Events")
changedPorts_json_struc = None changedPorts_json_struc = None
# DEBUG - print number of rows updated # DEBUG - print number of rows updated
mylog('info', ['[Notification] Notifications changes: ', sql.rowcount]) mylog('info', ['[Notification] Notifications changes: ', sql.rowcount])
# Commit changes # Commit changes
db.commitDB() db.commitDB()
@@ -319,53 +323,53 @@ def send_notifications (db):
def check_config(service): def check_config(service):
if service == 'email': if service == 'email':
if conf.SMTP_SERVER == '' or conf.REPORT_FROM == '' or conf.REPORT_TO == '': return email_check_config()
mylog('none', ['[Check Config] Error: Email service not set up correctly. Check your pialert.conf SMTP_*, REPORT_FROM and REPORT_TO variables.'])
return False # if conf.SMTP_SERVER == '' or conf.REPORT_FROM == '' or conf.REPORT_TO == '':
else: # mylog('none', ['[Check Config] Error: Email service not set up correctly. Check your pialert.conf SMTP_*, REPORT_FROM and REPORT_TO variables.'])
return True # return False
# else:
# return True
if service == 'apprise': if service == 'apprise':
if conf.APPRISE_URL == '' or conf.APPRISE_HOST == '': return apprise_check_config()
mylog('none', ['[Check Config] Error: Apprise service not set up correctly. Check your pialert.conf APPRISE_* variables.'])
return False # if conf.APPRISE_URL == '' or conf.APPRISE_HOST == '':
else: # mylog('none', ['[Check Config] Error: Apprise service not set up correctly. Check your pialert.conf APPRISE_* variables.'])
return True # return False
# else:
# return True
if service == 'webhook': if service == 'webhook':
if conf.WEBHOOK_URL == '': return webhook_check_config()
mylog('none', ['[Check Config] Error: Webhook service not set up correctly. Check your pialert.conf WEBHOOK_* variables.'])
return False # if conf.WEBHOOK_URL == '':
else: # mylog('none', ['[Check Config] Error: Webhook service not set up correctly. Check your pialert.conf WEBHOOK_* variables.'])
return True # return False
# else:
# return True
if service == 'ntfy': if service == 'ntfy':
if conf.NTFY_HOST == '' or conf.NTFY_TOPIC == '': return ntfy_check_config ()
mylog('none', ['[Check Config] Error: NTFY service not set up correctly. Check your pialert.conf NTFY_* variables.']) #
return False # if conf.NTFY_HOST == '' or conf.NTFY_TOPIC == '':
else: # mylog('none', ['[Check Config] Error: NTFY service not set up correctly. Check your pialert.conf NTFY_* variables.'])
return True # return False
# else:
# return True
if service == 'pushsafer': if service == 'pushsafer':
if conf.PUSHSAFER_TOKEN == 'ApiKey': return pushsafer_check_config()
mylog('none', ['[Check Config] Error: Pushsafer service not set up correctly. Check your pialert.conf PUSHSAFER_TOKEN variable.'])
return False
else:
return True
if service == 'mqtt': if service == 'mqtt':
if conf.MQTT_BROKER == '' or conf.MQTT_PORT == '' or conf.MQTT_USER == '' or conf.MQTT_PASSWORD == '': return mqtt_check_config()
mylog('none', ['[Check Config] Error: MQTT service not set up correctly. Check your pialert.conf MQTT_* variables.'])
return False
else:
return True
#------------------------------------------------------------------------------- #-------------------------------------------------------------------------------
def format_table (html, thValue, props, newThValue = ''): def format_table (html, thValue, props, newThValue = ''):
if newThValue == '': if newThValue == '':
newThValue = thValue newThValue = thValue
return html.replace("<th>"+thValue+"</th>", "<th "+props+" >"+newThValue+"</th>" ) return html.replace("<th>"+thValue+"</th>", "<th "+props+" >"+newThValue+"</th>" )
#------------------------------------------------------------------------------- #-------------------------------------------------------------------------------
@@ -375,9 +379,9 @@ def format_report_section (pActive, pSection, pTable, pText, pHTML):
# Replace section text # Replace section text
if pActive : if pActive :
conf.mail_text = conf.mail_text.replace ('<'+ pTable +'>', pText) conf.mail_text = conf.mail_text.replace ('<'+ pTable +'>', pText)
conf.mail_html = conf.mail_html.replace ('<'+ pTable +'>', pHTML) conf.mail_html = conf.mail_html.replace ('<'+ pTable +'>', pHTML)
conf.mail_text = remove_tag (conf.mail_text, pSection) conf.mail_text = remove_tag (conf.mail_text, pSection)
conf.mail_html = remove_tag (conf.mail_html, pSection) conf.mail_html = remove_tag (conf.mail_html, pSection)
else: else:
conf.mail_text = remove_section (conf.mail_text, pSection) conf.mail_text = remove_section (conf.mail_text, pSection)
@@ -387,7 +391,7 @@ def format_report_section (pActive, pSection, pTable, pText, pHTML):
def remove_section (pText, pSection): def remove_section (pText, pSection):
# Search section into the text # Search section into the text
if pText.find ('<'+ pSection +'>') >=0 \ if pText.find ('<'+ pSection +'>') >=0 \
and pText.find ('</'+ pSection +'>') >=0 : and pText.find ('</'+ pSection +'>') >=0 :
# return text without the section # return text without the section
return pText[:pText.find ('<'+ pSection+'>')] + \ return pText[:pText.find ('<'+ pSection+'>')] + \
pText[pText.find ('</'+ pSection +'>') + len (pSection) +3:] pText[pText.find ('</'+ pSection +'>') + len (pSection) +3:]
@@ -402,215 +406,8 @@ def remove_tag (pText, pTag):
#------------------------------------------------------------------------------- #-------------------------------------------------------------------------------
# Reporting # Reporting
#------------------------------------------------------------------------------- #-------------------------------------------------------------------------------
def send_email (pText, pHTML):
mylog('debug', '[Send Email] REPORT_TO: ' + hide_email(str(conf.REPORT_TO)) + ' SMTP_USER: ' + hide_email(str(conf.SMTP_USER)))
# Compose email
msg = MIMEMultipart('alternative')
msg['Subject'] = 'Pi.Alert Report'
msg['From'] = conf.REPORT_FROM
msg['To'] = conf.REPORT_TO
msg.attach (MIMEText (pText, 'plain'))
msg.attach (MIMEText (pHTML, 'html'))
failedAt = ''
failedAt = print_log ('SMTP try')
try:
# Send mail
failedAt = print_log('Trying to open connection to ' + str(conf.SMTP_SERVER) + ':' + str(conf.SMTP_PORT))
if conf.SMTP_FORCE_SSL:
failedAt = print_log('SMTP_FORCE_SSL == True so using .SMTP_SSL()')
if conf.SMTP_PORT == 0:
failedAt = print_log('SMTP_PORT == 0 so sending .SMTP_SSL(SMTP_SERVER)')
smtp_connection = smtplib.SMTP_SSL(conf.SMTP_SERVER)
else:
failedAt = print_log('SMTP_PORT == 0 so sending .SMTP_SSL(SMTP_SERVER, SMTP_PORT)')
smtp_connection = smtplib.SMTP_SSL(conf.SMTP_SERVER, conf.SMTP_PORT)
else:
failedAt = print_log('SMTP_FORCE_SSL == False so using .SMTP()')
if conf.SMTP_PORT == 0:
failedAt = print_log('SMTP_PORT == 0 so sending .SMTP(SMTP_SERVER)')
smtp_connection = smtplib.SMTP (conf.SMTP_SERVER)
else:
failedAt = print_log('SMTP_PORT == 0 so sending .SMTP(SMTP_SERVER, SMTP_PORT)')
smtp_connection = smtplib.SMTP (conf.SMTP_SERVER, conf.SMTP_PORT)
failedAt = print_log('Setting SMTP debug level')
# Log level set to debug of the communication between SMTP server and client
if conf.LOG_LEVEL == 'debug':
smtp_connection.set_debuglevel(1)
failedAt = print_log( 'Sending .ehlo()')
smtp_connection.ehlo()
if not conf.SMTP_SKIP_TLS:
failedAt = print_log('SMTP_SKIP_TLS == False so sending .starttls()')
smtp_connection.starttls()
failedAt = print_log('SMTP_SKIP_TLS == False so sending .ehlo()')
smtp_connection.ehlo()
if not conf.SMTP_SKIP_LOGIN:
failedAt = print_log('SMTP_SKIP_LOGIN == False so sending .login()')
smtp_connection.login (conf.SMTP_USER, conf.SMTP_PASS)
failedAt = print_log('Sending .sendmail()')
smtp_connection.sendmail (conf.REPORT_FROM, conf.REPORT_TO, msg.as_string())
smtp_connection.quit()
except smtplib.SMTPAuthenticationError as e:
mylog('none', [' ERROR: Failed at - ', failedAt])
mylog('none', [' ERROR: Couldn\'t connect to the SMTP server (SMTPAuthenticationError), skipping Email (enable LOG_LEVEL=debug for more logging)'])
except smtplib.SMTPServerDisconnected as e:
mylog('none', [' ERROR: Failed at - ', failedAt])
mylog('none', [' ERROR: Couldn\'t connect to the SMTP server (SMTPServerDisconnected), skipping Email (enable LOG_LEVEL=debug for more logging)'])
mylog('debug', '[Send Email] Last executed - ' + str(failedAt))
#-------------------------------------------------------------------------------
def send_ntfy (_Text):
headers = {
"Title": "Pi.Alert Notification",
"Actions": "view, Open Dashboard, "+ conf.REPORT_DASHBOARD_URL,
"Priority": "urgent",
"Tags": "warning"
}
# if username and password are set generate hash and update header
if conf.NTFY_USER != "" and conf.NTFY_PASSWORD != "":
# Generate hash for basic auth
usernamepassword = "{}:{}".format(conf.NTFY_USER,conf.NTFY_PASSWORD)
basichash = b64encode(bytes(conf.NTFY_USER + ':' + conf.NTFY_PASSWORD, "utf-8")).decode("ascii")
# add authorization header with hash
headers["Authorization"] = "Basic {}".format(basichash)
requests.post("{}/{}".format( conf.NTFY_HOST, conf.NTFY_TOPIC),
data=_Text,
headers=headers)
def send_pushsafer (_Text):
url = 'https://www.pushsafer.com/api'
post_fields = {
"t" : 'Pi.Alert Message',
"m" : _Text,
"s" : 11,
"v" : 3,
"i" : 148,
"c" : '#ef7f7f',
"d" : 'a',
"u" : conf.REPORT_DASHBOARD_URL,
"ut" : 'Open Pi.Alert',
"k" : conf.PUSHSAFER_TOKEN,
}
requests.post(url, data=post_fields)
#-------------------------------------------------------------------------------
def send_webhook (_json, _html):
# use data type based on specified payload type
if conf.WEBHOOK_PAYLOAD == 'json':
payloadData = _json
if conf.WEBHOOK_PAYLOAD == 'html':
payloadData = _html
if conf.WEBHOOK_PAYLOAD == 'text':
payloadData = to_text(_json)
# Define slack-compatible payload
_json_payload = { "text": payloadData } if conf.WEBHOOK_PAYLOAD == 'text' else {
"username": "Pi.Alert",
"text": "There are new notifications",
"attachments": [{
"title": "Pi.Alert Notifications",
"title_link": conf.REPORT_DASHBOARD_URL,
"text": payloadData
}]
}
# DEBUG - Write the json payload into a log file for debugging
write_file (logPath + '/webhook_payload.json', json.dumps(_json_payload))
# Using the Slack-Compatible Webhook endpoint for Discord so that the same payload can be used for both
if(conf.WEBHOOK_URL.startswith('https://discord.com/api/webhooks/') and not conf.WEBHOOK_URL.endswith("/slack")):
_WEBHOOK_URL = f"{conf.WEBHOOK_URL}/slack"
curlParams = ["curl","-i","-H", "Content-Type:application/json" ,"-d", json.dumps(_json_payload), _WEBHOOK_URL]
else:
_WEBHOOK_URL = conf.WEBHOOK_URL
curlParams = ["curl","-i","-X", conf.WEBHOOK_REQUEST_METHOD ,"-H", "Content-Type:application/json" ,"-d", json.dumps(_json_payload), _WEBHOOK_URL]
# execute CURL call
try:
# try runnning a subprocess
mylog('debug', '[send_webhook] curlParams: '+ curlParams)
p = subprocess.Popen(curlParams, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
stdout, stderr = p.communicate()
# write stdout and stderr into .log files for debugging if needed
logResult (stdout, stderr) # TO-DO should be changed to mylog
except subprocess.CalledProcessError as e:
# An error occured, handle it
mylog('none', ['[send_webhook]', e.output])
#-------------------------------------------------------------------------------
def send_apprise (html, text):
#Define Apprise compatible payload (https://github.com/caronc/apprise-api#stateless-solution)
payload = html
if conf.APPRISE_PAYLOAD == 'text':
payload = text
_json_payload={
"urls": conf.APPRISE_URL,
"title": "Pi.Alert Notifications",
"format": conf.APPRISE_PAYLOAD,
"body": payload
}
try:
# try runnning a subprocess
p = subprocess.Popen(["curl","-i","-X", "POST" ,"-H", "Content-Type:application/json" ,"-d", json.dumps(_json_payload), conf.APPRISE_HOST], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
stdout, stderr = p.communicate()
# write stdout and stderr into .log files for debugging if needed
logResult (stdout, stderr) # TO-DO should be changed to mylog
except subprocess.CalledProcessError as e:
# An error occured, handle it
mylog('none', [e.output])
def to_text(_json):
payloadData = ""
if len(_json['internet']) > 0 and 'internet' in conf.INCLUDED_SECTIONS:
payloadData += "INTERNET\n"
for event in _json['internet']:
payloadData += event[3] + ' on ' + event[2] + '. ' + event[4] + '. New address:' + event[1] + '\n'
if len(_json['new_devices']) > 0 and 'new_devices' in conf.INCLUDED_SECTIONS:
payloadData += "NEW DEVICES:\n"
for event in _json['new_devices']:
if event[4] is None:
event[4] = event[11]
payloadData += event[1] + ' - ' + event[4] + '\n'
if len(_json['down_devices']) > 0 and 'down_devices' in conf.INCLUDED_SECTIONS:
write_file (logPath + '/down_devices_example.log', _json['down_devices'])
payloadData += 'DOWN DEVICES:\n'
for event in _json['down_devices']:
if event[4] is None:
event[4] = event[11]
payloadData += event[1] + ' - ' + event[4] + '\n'
if len(_json['events']) > 0 and 'events' in conf.INCLUDED_SECTIONS:
payloadData += "EVENTS:\n"
for event in _json['events']:
if event[8] != "Internet":
payloadData += event[8] + " on " + event[1] + " " + event[3] + " at " + event[2] + "\n"
return payloadData
#------------------------------------------------------------------------------- #-------------------------------------------------------------------------------
def send_api(): def send_api():
@@ -618,11 +415,11 @@ def send_api():
write_file(apiPath + 'notification_text.txt' , mail_text) write_file(apiPath + 'notification_text.txt' , mail_text)
write_file(apiPath + 'notification_text.html' , mail_html) write_file(apiPath + 'notification_text.html' , mail_html)
write_file(apiPath + 'notification_json_final.json' , json.dumps(json_final)) write_file(apiPath + 'notification_json_final.json' , json.dumps(json_final))
#------------------------------------------------------------------------------- #-------------------------------------------------------------------------------
def skip_repeated_notifications (db): def skip_repeated_notifications (db):
# Skip repeated notifications # Skip repeated notifications
# due strfime : Overflow --> use "strftime / 60" # due strfime : Overflow --> use "strftime / 60"
@@ -640,7 +437,7 @@ def skip_repeated_notifications (db):
""" ) """ )
mylog('verbose','[Skip Repeated Notifications] Skip Repeated end') mylog('verbose','[Skip Repeated Notifications] Skip Repeated end')
db.commitDB() db.commitDB()
#=============================================================================== #===============================================================================
@@ -651,10 +448,10 @@ def skip_repeated_notifications (db):
def check_and_run_event(db): def check_and_run_event(db):
sql = db.sql # TO-DO sql = db.sql # TO-DO
sql.execute(""" select * from Parameters where par_ID = "Front_Event" """) sql.execute(""" select * from Parameters where par_ID = "Front_Event" """)
rows = sql.fetchall() rows = sql.fetchall()
event, param = ['',''] event, param = ['','']
if len(rows) > 0 and rows[0]['par_Value'] != 'finished': if len(rows) > 0 and rows[0]['par_Value'] != 'finished':
event = rows[0]['par_Value'].split('|')[0] event = rows[0]['par_Value'].split('|')[0]
param = rows[0]['par_Value'].split('|')[1] param = rows[0]['par_Value'].split('|')[1]
else: else:
@@ -666,45 +463,47 @@ def check_and_run_event(db):
handle_run(param) handle_run(param)
# clear event execution flag # clear event execution flag
sql.execute ("UPDATE Parameters SET par_Value='finished' WHERE par_ID='Front_Event'") sql.execute ("UPDATE Parameters SET par_Value='finished' WHERE par_ID='Front_Event'")
# commit to DB # commit to DB
db.commitDB() db.commitDB()
#------------------------------------------------------------------------------- #-------------------------------------------------------------------------------
def handle_run(runType): def handle_run(runType):
global last_network_scan global last_network_scan
mylog('info', ['[', timeNow(), '] START Run: ', runType]) mylog('info', ['[', timeNow(), '] START Run: ', runType])
if runType == 'ENABLE_ARPSCAN': if runType == 'ENABLE_ARPSCAN':
last_network_scan = conf.time_started - datetime.timedelta(hours = 24) last_network_scan = conf.time_started - datetime.timedelta(hours = 24)
mylog('info', ['[', timeNow(), '] END Run: ', runType]) mylog('info', ['[', timeNow(), '] END Run: ', runType])
#------------------------------------------------------------------------------- #-------------------------------------------------------------------------------
def handle_test(testType): def handle_test(testType):
mylog('info', ['[', timeNow(), '] START Test: ', testType]) mylog('info', ['[', timeNow(), '] START Test: ', testType])
# Open text sample # Open text sample
sample_txt = get_file_content(pialertPath + '/back/report_sample.txt') sample_txt = get_file_content(pialertPath + '/back/report_sample.txt')
# Open html sample # Open html sample
sample_html = get_file_content(pialertPath + '/back/report_sample.html') sample_html = get_file_content(pialertPath + '/back/report_sample.html')
# Open json sample and get only the payload part # Open json sample and get only the payload part
sample_json_payload = json.loads(get_file_content(pialertPath + '/back/webhook_json_sample.json'))[0]["body"]["attachments"][0]["text"] sample_json_payload = json.loads(get_file_content(pialertPath + '/back/webhook_json_sample.json'))[0]["body"]["attachments"][0]["text"]
if testType == 'REPORT_MAIL':
send_email(sample_txt, sample_html)
if testType == 'REPORT_WEBHOOK':
send_webhook (sample_json_payload, sample_txt)
if testType == 'REPORT_APPRISE':
send_apprise (sample_html, sample_txt)
if testType == 'REPORT_NTFY':
send_ntfy (sample_txt)
if testType == 'REPORT_PUSHSAFER':
send_pushsafer (sample_txt)
mylog('info', ['[', timeNow(), '] END Test: ', testType]) sample_msg = noti_struc(sample_json_payload, sample_txt, sample_html )
if testType == 'REPORT_MAIL':
send_email(sample_msg)
if testType == 'REPORT_WEBHOOK':
send_webhook (sample_msg)
if testType == 'REPORT_APPRISE':
send_apprise (sample_msg)
if testType == 'REPORT_NTFY':
send_ntfy (sample_msg)
if testType == 'REPORT_PUSHSAFER':
send_pushsafer (sample_msg)
mylog('info', ['[Test Publishers] END Test: ', testType])

1
test/__init__.py Normal file
View File

@@ -0,0 +1 @@
""" tests for Pi.Alert """

29
test/test_helper.py Normal file
View File

@@ -0,0 +1,29 @@
import sys
import pathlib
sys.path.append(str(pathlib.Path(__file__).parent.parent.resolve()) + "/pialert/")
import datetime
from helper import timeNow, updateSubnets
# -------------------------------------------------------------------------------
def test_helper():
assert timeNow() == datetime.datetime.now().replace(microsecond=0)
# -------------------------------------------------------------------------------
def test_updateSubnets():
# test single subnet
subnet = "192.168.1.0/24 --interface=eth0"
result = updateSubnets(subnet)
assert type(result) is list
assert len(result) == 1
# test multip subnets
subnet = ["192.168.1.0/24 --interface=eth0", "192.168.2.0/24 --interface=eth1"]
result = updateSubnets(subnet)
assert type(result) is list
assert len(result) == 2