convert pholus to plugin v0.2

This commit is contained in:
Jokob-sk
2023-08-24 15:54:31 +10:00
parent 3b60a3a1ae
commit 445b4de69e
14 changed files with 220 additions and 401 deletions

View File

@@ -24,7 +24,7 @@ import multiprocessing
import conf
from const import *
from logger import mylog
from helper import filePermissions, isNewVersion, timeNowTZ, updateState
from helper import filePermissions, isNewVersion, timeNowTZ, updateState, get_setting_value
from api import update_api
from networkscan import process_scan
from initialise import importConfigs
@@ -34,7 +34,6 @@ from reporting import check_and_run_event, send_notifications
from plugin import run_plugin_scripts
# different scanners
from scanners.pholusscan import performPholusScan
from scanners.nmapscan import performNmapScan
from scanners.internet import check_internet_IP
@@ -60,7 +59,6 @@ main structure of Pi Alert
run plugins (scheduled)
check internet IP
check vendor
run PHOLUS
run NMAP
run "scan_network()"
processing scan results
@@ -161,25 +159,7 @@ def main ():
conf.last_update_vendors = loop_start_time
conf.cycle = 'update_vendors'
mylog('verbose', ['[MAIN] cycle:',conf.cycle])
update_devices_MAC_vendors(db)
# Execute scheduled or one-off Pholus scan if enabled and run conditions fulfilled
if conf.PHOLUS_RUN == "schedule" or conf.PHOLUS_RUN == "once":
pholusSchedule = [sch for sch in conf.mySchedules if sch.service == "pholus"][0]
run = False
# run once after application starts
if conf.PHOLUS_RUN == "once" and pholusSchedule.last_run == 0:
run = True
# run if overdue scheduled time
if conf.PHOLUS_RUN == "schedule":
run = pholusSchedule.runScheduleCheck()
if run:
pholusSchedule.last_run = datetime.datetime.now(conf.tz).replace(microsecond=0)
performPholusScan(db, conf.PHOLUS_RUN_TIMEOUT, conf.userSubnets)
update_devices_MAC_vendors(db)
# Execute scheduled or one-off Nmap scan if enabled and run conditions fulfilled
if conf.NMAP_RUN == "schedule" or conf.NMAP_RUN == "once":
@@ -236,7 +216,7 @@ def main ():
conf.last_cleanup = loop_start_time
conf.cycle = 'cleanup'
mylog('verbose', ['[MAIN] cycle:',conf.cycle])
db.cleanup_database(startTime, conf.DAYS_TO_KEEP_EVENTS, conf.PHOLUS_DAYS_DATA, conf.HRS_TO_KEEP_NEWDEV, conf.PLUGINS_KEEP_HIST)
db.cleanup_database(startTime, conf.DAYS_TO_KEEP_EVENTS, get_setting_value('PHOLUS_DAYS_DATA'), conf.HRS_TO_KEEP_NEWDEV, conf.PLUGINS_KEEP_HIST)
# Commit SQL
db.commitDB()

View File

@@ -103,15 +103,6 @@ DDNS_USER = 'dynu_user'
DDNS_PASSWORD = 'A0000000B0000000C0000000D0000000'
DDNS_UPDATE_URL = 'https://api.dynu.com/nic/update?'
# PHOLUS
PHOLUS_ACTIVE = False
PHOLUS_TIMEOUT = 20
PHOLUS_FORCE = False
PHOLUS_RUN = 'once'
PHOLUS_RUN_TIMEOUT = 600
PHOLUS_RUN_SCHD = '0 4 * * *'
PHOLUS_DAYS_DATA = 0
# Nmap
NMAP_ACTIVE = True
NMAP_TIMEOUT = 150

View File

@@ -3,11 +3,11 @@ import subprocess
import conf
import re
from helper import timeNowTZ, get_setting, get_setting_value
from helper import timeNowTZ, get_setting, get_setting_value,resolve_device_name_dig, resolve_device_name_pholus
from scanners.internet import check_IP_format, get_internet_IP
from logger import mylog, print_log
from mac_vendor import query_MAC_vendor
from scanners.pholusscan import performPholusScan, resolve_device_name_dig, resolve_device_name_pholus
#-------------------------------------------------------------------------------
@@ -363,10 +363,6 @@ def update_devices_names (db):
unknownDevices = sql.fetchall()
db.commitDB()
# perform Pholus scan if (unknown) devices found
if conf.PHOLUS_ACTIVE and (len(unknownDevices) > 0 or conf.PHOLUS_FORCE):
performPholusScan(db, conf.PHOLUS_TIMEOUT, conf.userSubnets)
# skip checks if no unknown devices
if len(unknownDevices) == 0 and conf.PHOLUS_FORCE == False:
return

View File

@@ -6,6 +6,7 @@ import datetime
import os
import re
import subprocess
import pytz
from pytz import timezone
from datetime import timedelta
import json
@@ -20,7 +21,12 @@ from logger import mylog, logResult
#-------------------------------------------------------------------------------
def timeNowTZ():
return datetime.datetime.now(conf.tz).replace(microsecond=0)
if isinstance(conf.TIMEZONE, str):
tz = pytz.timezone(conf.TIMEZONE)
else:
tz = conf.TIMEZONE
return datetime.datetime.now(tz).replace(microsecond=0)
def timeNow():
return datetime.datetime.now().replace(microsecond=0)
@@ -370,4 +376,135 @@ def get_setting_value(key):
return setVal
return ''
return ''
#-------------------------------------------------------------------------------
# Disclaimer - I'm interfacing with a script I didn't write (pholus3.py) so it's possible I'm missing types of answers
# it's also possible the pholus3.py script can be adjusted to provide a better output to interface with it
# Hit me with a PR if you know how! :)
def resolve_device_name_pholus (pMAC, pIP, allRes):
pholusMatchesIndexes = []
index = 0
for result in allRes:
# limiting entries used for name resolution to the ones containing the current IP (v4 only)
if result["MAC"] == pMAC and result["Record_Type"] == "Answer" and result["IP_v4_or_v6"] == pIP and '._googlezone' not in result["Value"]:
# found entries with a matching MAC address, let's collect indexes
pholusMatchesIndexes.append(index)
index += 1
# return if nothing found
if len(pholusMatchesIndexes) == 0:
return -1
# we have some entries let's try to select the most useful one
# airplay matches contain a lot of information
# Matches for example:
# Brand Tv (50)._airplay._tcp.local. TXT Class:32769 "acl=0 deviceid=66:66:66:66:66:66 features=0x77777,0x38BCB46 rsf=0x3 fv=p20.T-FFFFFF-03.1 flags=0x204 model=XXXX manufacturer=Brand serialNumber=XXXXXXXXXXX protovers=1.1 srcvers=777.77.77 pi=FF:FF:FF:FF:FF:FF psi=00000000-0000-0000-0000-FFFFFFFFFF gid=00000000-0000-0000-0000-FFFFFFFFFF gcgl=0 pk=AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
for i in pholusMatchesIndexes:
if checkIPV4(allRes[i]['IP_v4_or_v6']) and '._airplay._tcp.local. TXT Class:32769' in str(allRes[i]["Value"]) :
return allRes[i]["Value"].split('._airplay._tcp.local. TXT Class:32769')[0]
# second best - contains airplay
# Matches for example:
# _airplay._tcp.local. PTR Class:IN "Brand Tv (50)._airplay._tcp.local."
for i in pholusMatchesIndexes:
if checkIPV4(allRes[i]['IP_v4_or_v6']) and '_airplay._tcp.local. PTR Class:IN' in allRes[i]["Value"] and ('._googlecast') not in allRes[i]["Value"]:
return cleanResult(allRes[i]["Value"].split('"')[1])
# Contains PTR Class:32769
# Matches for example:
# 3.1.168.192.in-addr.arpa. PTR Class:32769 "MyPc.local."
for i in pholusMatchesIndexes:
if checkIPV4(allRes[i]['IP_v4_or_v6']) and 'PTR Class:32769' in allRes[i]["Value"]:
return cleanResult(allRes[i]["Value"].split('"')[1])
# Contains AAAA Class:IN
# Matches for example:
# DESKTOP-SOMEID.local. AAAA Class:IN "fe80::fe80:fe80:fe80:fe80"
for i in pholusMatchesIndexes:
if checkIPV4(allRes[i]['IP_v4_or_v6']) and 'AAAA Class:IN' in allRes[i]["Value"]:
return cleanResult(allRes[i]["Value"].split('.local.')[0])
# Contains _googlecast._tcp.local. PTR Class:IN
# Matches for example:
# _googlecast._tcp.local. PTR Class:IN "Nest-Audio-ff77ff77ff77ff77ff77ff77ff77ff77._googlecast._tcp.local."
for i in pholusMatchesIndexes:
if checkIPV4(allRes[i]['IP_v4_or_v6']) and '_googlecast._tcp.local. PTR Class:IN' in allRes[i]["Value"] and ('Google-Cast-Group') not in allRes[i]["Value"]:
return cleanResult(allRes[i]["Value"].split('"')[1])
# Contains A Class:32769
# Matches for example:
# Android.local. A Class:32769 "192.168.1.6"
for i in pholusMatchesIndexes:
if checkIPV4(allRes[i]['IP_v4_or_v6']) and ' A Class:32769' in allRes[i]["Value"]:
return cleanResult(allRes[i]["Value"].split(' A Class:32769')[0])
# # Contains PTR Class:IN
# Matches for example:
# _esphomelib._tcp.local. PTR Class:IN "ceiling-light-1._esphomelib._tcp.local."
for i in pholusMatchesIndexes:
if checkIPV4(allRes[i]['IP_v4_or_v6']) and 'PTR Class:IN' in allRes[i]["Value"]:
if allRes[i]["Value"] and len(allRes[i]["Value"].split('"')) > 1:
return cleanResult(allRes[i]["Value"].split('"')[1])
return -1
#-------------------------------------------------------------------------------
def resolve_device_name_dig (pMAC, pIP):
newName = ""
try :
dig_args = ['dig', '+short', '-x', pIP]
# Execute command
try:
# try runnning a subprocess
newName = subprocess.check_output (dig_args, universal_newlines=True)
except subprocess.CalledProcessError as e:
# An error occured, handle it
mylog('none', ['[device_name_dig] ', e.output])
# newName = "Error - check logs"
return -1
# Check returns
newName = newName.strip()
if len(newName) == 0 :
return -1
# Cleanup
newName = cleanResult(newName)
if newName == "" or len(newName) == 0:
return -1
# Return newName
return newName
# not Found
except subprocess.CalledProcessError :
return -1
#-------------------------------------------------------------------------------
def cleanResult(str):
# alternative str.split('.')[0]
str = str.replace("._airplay", "")
str = str.replace("._tcp", "")
str = str.replace(".local", "")
str = str.replace("._esphomelib", "")
str = str.replace("._googlecast", "")
str = str.replace(".lan", "")
str = str.replace(".home", "")
str = re.sub(r'-[a-fA-F0-9]{32}', '', str) # removing last part of e.g. Nest-Audio-ff77ff77ff77ff77ff77ff77ff77ff77
str = re.sub(r'#.*', '', str) # Remove everything after '#' including the '#'
# remove trailing dots
if str.endswith('.'):
str = str[:-1]
return str

View File

@@ -154,15 +154,6 @@ def importConfigs (db):
conf.DDNS_PASSWORD = ccd('DDNS_PASSWORD', 'A0000000B0000000C0000000D0000000' , c_d, 'DynDNS password', 'password', '', 'DynDNS')
conf.DDNS_UPDATE_URL = ccd('DDNS_UPDATE_URL', 'https://api.dynu.com/nic/update?' , c_d, 'DynDNS update URL', 'text', '', 'DynDNS')
# PHOLUS
conf.PHOLUS_ACTIVE = ccd('PHOLUS_ACTIVE', False , c_d, 'Enable Pholus scans', 'boolean', '', 'Pholus')
conf.PHOLUS_TIMEOUT = ccd('PHOLUS_TIMEOUT', 20 , c_d, 'Pholus timeout', 'integer', '', 'Pholus')
conf.PHOLUS_FORCE = ccd('PHOLUS_FORCE', False , c_d, 'Pholus force check', 'boolean', '', 'Pholus')
conf.PHOLUS_RUN = ccd('PHOLUS_RUN', 'once' , c_d, 'Pholus enable schedule', 'text.select', "['disabled', 'once', 'schedule']", 'Pholus')
conf.PHOLUS_RUN_TIMEOUT = ccd('PHOLUS_RUN_TIMEOUT', 600 , c_d, 'Pholus timeout schedule', 'integer', '', 'Pholus')
conf.PHOLUS_RUN_SCHD = ccd('PHOLUS_RUN_SCHD', '0 4 * * *' , c_d, 'Pholus schedule', 'text', '', 'Pholus')
conf.PHOLUS_DAYS_DATA = ccd('PHOLUS_DAYS_DATA', 0 , c_d, 'Pholus keep days', 'integer', '', 'Pholus')
# Nmap
conf.NMAP_ACTIVE = ccd('NMAP_ACTIVE', True , c_d, 'Enable Nmap scans', 'boolean', '', 'Nmap')
conf.NMAP_TIMEOUT = ccd('NMAP_TIMEOUT', 150 , c_d, 'Nmap timeout', 'integer', '', 'Nmap')
@@ -197,11 +188,6 @@ def importConfigs (db):
# reset schedules
conf.mySchedules = []
# init pholus schedule
pholusSchedule = Cron(conf.PHOLUS_RUN_SCHD).schedule(start_date=datetime.datetime.now(conf.tz))
conf.mySchedules.append(schedule_class("pholus", pholusSchedule, pholusSchedule.next(), False))
# init nmap schedule
nmapSchedule = Cron(conf.NMAP_RUN_SCHD).schedule(start_date=datetime.datetime.now(conf.tz))
conf.mySchedules.append(schedule_class("nmap", nmapSchedule, nmapSchedule.next(), False))

View File

@@ -135,7 +135,7 @@ def resolve_wildcards_arr(commandArr, params):
for comPart in commandArr:
commandArr[i] = comPart.replace('{' + param[0] + '}', param[1]).replace('{s-quote}',"'")
commandArr[i] = comPart.replace('{' + str(param[0]) + '}', str(param[1])).replace('{s-quote}',"'")
i += 1

View File

@@ -7,196 +7,3 @@ from logger import mylog
#-------------------------------------------------------------------------------
def performPholusScan (db, timeoutSec, userSubnets):
sql = db.sql # TO-DO
# scan every interface
for subnet in userSubnets:
temp = subnet.split("--interface=")
if len(temp) != 2:
mylog('none', ["[PholusScan] Skip scan (need subnet in format '192.168.1.0/24 --inteface=eth0'), got: ", subnet])
return
mask = temp[0].strip()
interface = temp[1].strip()
# logging & updating app state
updateState(db,"Scan: Pholus")
mylog('none', ['[PholusScan] Scan: Pholus for ', str(timeoutSec), 's ('+ str(round(int(timeoutSec) / 60, 1)) +'min)'])
mylog('verbose', ["[PholusScan] Pholus scan on [interface] ", interface, " [mask] " , mask])
# the scan always lasts 2x as long, so the desired user time from settings needs to be halved
adjustedTimeout = str(round(int(timeoutSec) / 2, 0))
# python3 -m trace --trace /home/pi/pialert/pholus/pholus3.py eth1 -rdns_scanning 192.168.1.0/24 -stimeout 600
pholus_args = ['python3', fullPholusPath, interface, "-rdns_scanning", mask, "-stimeout", adjustedTimeout]
# Execute command
output = ""
try:
# try runnning a subprocess with a forced (timeout + 30 seconds) in case the subprocess hangs
output = subprocess.check_output (pholus_args, universal_newlines=True, stderr=subprocess.STDOUT, timeout=(timeoutSec + 30))
except subprocess.CalledProcessError as e:
# An error occured, handle it
mylog('none', ['[PholusScan]', e.output])
mylog('none', ["[PholusScan] Error - Pholus Scan - check logs"])
except subprocess.TimeoutExpired as timeErr:
mylog('none', ['[PholusScan] Pholus TIMEOUT - the process forcefully terminated as timeout reached'])
if output == "": # check if the subprocess failed
mylog('none', ['[PholusScan] Scan: Pholus FAIL - check logs'])
else:
mylog('verbose', ['[PholusScan] Scan: Pholus SUCCESS'])
# check the last run output
f = open(logPath + '/pialert_pholus_lastrun.log', 'r+')
newLines = f.read().split('\n')
f.close()
# cleanup - select only lines containing a separator to filter out unnecessary data
newLines = list(filter(lambda x: '|' in x, newLines))
# build SQL query parameters to insert into the DB
params = []
for line in newLines:
columns = line.split("|")
if len(columns) == 4:
params.append(( interface + " " + mask, timeNowTZ() , columns[0].replace(" ", ""), columns[1].replace(" ", ""), columns[2].replace(" ", ""), columns[3], ''))
if len(params) > 0:
sql.executemany ("""INSERT INTO Pholus_Scan ("Info", "Time", "MAC", "IP_v4_or_v6", "Record_Type", "Value", "Extra") VALUES (?, ?, ?, ?, ?, ?, ?)""", params)
db.commitDB()
#-------------------------------------------------------------------------------
def cleanResult(str):
# alternative str.split('.')[0]
str = str.replace("._airplay", "")
str = str.replace("._tcp", "")
str = str.replace(".local", "")
str = str.replace("._esphomelib", "")
str = str.replace("._googlecast", "")
str = str.replace(".lan", "")
str = str.replace(".home", "")
str = re.sub(r'-[a-fA-F0-9]{32}', '', str) # removing last part of e.g. Nest-Audio-ff77ff77ff77ff77ff77ff77ff77ff77
str = re.sub(r'#.*', '', str) # Remove everything after '#' including the '#'
# remove trailing dots
if str.endswith('.'):
str = str[:-1]
return str
# Disclaimer - I'm interfacing with a script I didn't write (pholus3.py) so it's possible I'm missing types of answers
# it's also possible the pholus3.py script can be adjusted to provide a better output to interface with it
# Hit me with a PR if you know how! :)
def resolve_device_name_pholus (pMAC, pIP, allRes):
pholusMatchesIndexes = []
index = 0
for result in allRes:
# limiting entries used for name resolution to the ones containing the current IP (v4 only)
if result["MAC"] == pMAC and result["Record_Type"] == "Answer" and result["IP_v4_or_v6"] == pIP and '._googlezone' not in result["Value"]:
# found entries with a matching MAC address, let's collect indexes
pholusMatchesIndexes.append(index)
index += 1
# return if nothing found
if len(pholusMatchesIndexes) == 0:
return -1
# we have some entries let's try to select the most useful one
# airplay matches contain a lot of information
# Matches for example:
# Brand Tv (50)._airplay._tcp.local. TXT Class:32769 "acl=0 deviceid=66:66:66:66:66:66 features=0x77777,0x38BCB46 rsf=0x3 fv=p20.T-FFFFFF-03.1 flags=0x204 model=XXXX manufacturer=Brand serialNumber=XXXXXXXXXXX protovers=1.1 srcvers=777.77.77 pi=FF:FF:FF:FF:FF:FF psi=00000000-0000-0000-0000-FFFFFFFFFF gid=00000000-0000-0000-0000-FFFFFFFFFF gcgl=0 pk=AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
for i in pholusMatchesIndexes:
if checkIPV4(allRes[i]['IP_v4_or_v6']) and '._airplay._tcp.local. TXT Class:32769' in str(allRes[i]["Value"]) :
return allRes[i]["Value"].split('._airplay._tcp.local. TXT Class:32769')[0]
# second best - contains airplay
# Matches for example:
# _airplay._tcp.local. PTR Class:IN "Brand Tv (50)._airplay._tcp.local."
for i in pholusMatchesIndexes:
if checkIPV4(allRes[i]['IP_v4_or_v6']) and '_airplay._tcp.local. PTR Class:IN' in allRes[i]["Value"] and ('._googlecast') not in allRes[i]["Value"]:
return cleanResult(allRes[i]["Value"].split('"')[1])
# Contains PTR Class:32769
# Matches for example:
# 3.1.168.192.in-addr.arpa. PTR Class:32769 "MyPc.local."
for i in pholusMatchesIndexes:
if checkIPV4(allRes[i]['IP_v4_or_v6']) and 'PTR Class:32769' in allRes[i]["Value"]:
return cleanResult(allRes[i]["Value"].split('"')[1])
# Contains AAAA Class:IN
# Matches for example:
# DESKTOP-SOMEID.local. AAAA Class:IN "fe80::fe80:fe80:fe80:fe80"
for i in pholusMatchesIndexes:
if checkIPV4(allRes[i]['IP_v4_or_v6']) and 'AAAA Class:IN' in allRes[i]["Value"]:
return cleanResult(allRes[i]["Value"].split('.local.')[0])
# Contains _googlecast._tcp.local. PTR Class:IN
# Matches for example:
# _googlecast._tcp.local. PTR Class:IN "Nest-Audio-ff77ff77ff77ff77ff77ff77ff77ff77._googlecast._tcp.local."
for i in pholusMatchesIndexes:
if checkIPV4(allRes[i]['IP_v4_or_v6']) and '_googlecast._tcp.local. PTR Class:IN' in allRes[i]["Value"] and ('Google-Cast-Group') not in allRes[i]["Value"]:
return cleanResult(allRes[i]["Value"].split('"')[1])
# Contains A Class:32769
# Matches for example:
# Android.local. A Class:32769 "192.168.1.6"
for i in pholusMatchesIndexes:
if checkIPV4(allRes[i]['IP_v4_or_v6']) and ' A Class:32769' in allRes[i]["Value"]:
return cleanResult(allRes[i]["Value"].split(' A Class:32769')[0])
# # Contains PTR Class:IN
# Matches for example:
# _esphomelib._tcp.local. PTR Class:IN "ceiling-light-1._esphomelib._tcp.local."
for i in pholusMatchesIndexes:
if checkIPV4(allRes[i]['IP_v4_or_v6']) and 'PTR Class:IN' in allRes[i]["Value"]:
if allRes[i]["Value"] and len(allRes[i]["Value"].split('"')) > 1:
return cleanResult(allRes[i]["Value"].split('"')[1])
return -1
#-------------------------------------------------------------------------------
def resolve_device_name_dig (pMAC, pIP):
newName = ""
try :
dig_args = ['dig', '+short', '-x', pIP]
# Execute command
try:
# try runnning a subprocess
newName = subprocess.check_output (dig_args, universal_newlines=True)
except subprocess.CalledProcessError as e:
# An error occured, handle it
mylog('none', ['[device_name_dig] ', e.output])
# newName = "Error - check logs"
return -1
# Check returns
newName = newName.strip()
if len(newName) == 0 :
return -1
# Cleanup
newName = cleanResult(newName)
if newName == "" or len(newName) == 0:
return -1
# Return newName
return newName
# not Found
except subprocess.CalledProcessError :
return -1