chore:PHOLUS removal

This commit is contained in:
jokob-sk
2024-11-22 20:32:49 +11:00
parent e776c3ac41
commit 0e438ffd57
29 changed files with 64 additions and 3191 deletions

View File

@@ -476,10 +476,6 @@ class DB():
self.sql.execute(""" DROP VIEW IF EXISTS Sessions_Devices;""")
self.sql.execute("""CREATE VIEW Sessions_Devices AS SELECT * FROM Sessions LEFT JOIN "Devices" ON ses_MAC = devMac;""")
# -------------------------------------------------------------------------
# Settings table setup
# -------------------------------------------------------------------------
@@ -505,24 +501,9 @@ class DB():
""")
# -------------------------------------------------------------------------
# Pholus_Scan table setup
# -------------------------------------------------------------------------
# Create Pholus_Scan table if missing
mylog('verbose', ["[upgradeDB] Re-creating Pholus_Scan table"])
self.sql.execute("""CREATE TABLE IF NOT EXISTS "Pholus_Scan" (
"Index" INTEGER,
"Info" TEXT,
"Time" TEXT,
"MAC" TEXT,
"IP_v4_or_v6" TEXT,
"Record_Type" TEXT,
"Value" TEXT,
"Extra" TEXT,
PRIMARY KEY("Index" AUTOINCREMENT)
);
""")
mylog('verbose', ["[upgradeDB] Removing Pholus_Scan table"])
self.sql.execute("""DROP TABLE IF EXISTS Pholus_Scan""")
# -------------------------------------------------------------------------

View File

@@ -4,7 +4,7 @@ import subprocess
import conf
import os
import re
from helper import timeNowTZ, get_setting, get_setting_value, list_to_where, resolve_device_name_dig, resolve_device_name_pholus, get_device_name_nbtlookup, get_device_name_nslookup, get_device_name_mdns, check_IP_format, sanitize_SQL_input
from helper import timeNowTZ, get_setting, get_setting_value, list_to_where, resolve_device_name_dig, get_device_name_nbtlookup, get_device_name_nslookup, get_device_name_mdns, check_IP_format, sanitize_SQL_input
from logger import mylog, print_log
from const import vendorsPath, vendorsPathNewest, sql_generateGuid
@@ -501,7 +501,6 @@ def update_devices_names (db):
foundmDNSLookup = 0
foundNsLookup = 0
foundNbtLookup = 0
foundPholus = 0
# Gen unknown devices
sql.execute ("SELECT * FROM Devices WHERE devName IN ('(unknown)','', '(name not found)') AND devLastIP <> '-'")
@@ -515,15 +514,6 @@ def update_devices_names (db):
# Devices without name
mylog('verbose', f'[Update Device Name] Trying to resolve devices without name. Unknown devices count: {len(unknownDevices)}')
# get names from Pholus scan
sql.execute ('SELECT * FROM Pholus_Scan where "Record_Type"="Answer"')
pholusResults = list(sql.fetchall())
db.commitDB()
# Number of entries from previous Pholus scans
mylog('verbose', ['[Update Device Name] Pholus entries from prev scans: ', len(pholusResults)])
for device in unknownDevices:
newName = nameNotFound
@@ -548,26 +538,12 @@ def update_devices_names (db):
if newName != nameNotFound:
foundNsLookup += 1
# Resolve device name with NSLOOKUP plugin data
# Resolve device name with NBTLOOKUP plugin data
if newName == nameNotFound:
newName = get_device_name_nbtlookup(db, device['devMac'], device['devLastIP'])
if newName != nameNotFound:
foundNbtLookup += 1
# Resolve with Pholus
if newName == nameNotFound:
# Try MAC matching
newName = resolve_device_name_pholus (device['devMac'], device['devLastIP'], pholusResults, nameNotFound, False)
# Try IP matching
if newName == nameNotFound:
newName = resolve_device_name_pholus (device['devMac'], device['devLastIP'], pholusResults, nameNotFound, True)
# count
if newName != nameNotFound:
foundPholus += 1
# if still not found update name so we can distinguish the devices where we tried already
if newName == nameNotFound :
@@ -579,11 +555,11 @@ def update_devices_names (db):
if device['devName'] != nameNotFound:
recordsNotFound.append (["(name not found)", device['devMac']])
else:
# name was found with DiG or Pholus
# name was found
recordsToUpdate.append ([newName, device['devMac']])
# Print log
mylog('verbose', [f'[Update Device Name] Names Found (DiG/mDNS/NSLOOKUP/NBTSCAN/Pholus): {len(recordsToUpdate)} ({foundDig}/{foundmDNSLookup}/{foundNsLookup}/{foundNbtLookup}/{foundPholus})'] )
mylog('verbose', [f'[Update Device Name] Names Found (DiG/mDNS/NSLOOKUP/NBTSCAN): {len(recordsToUpdate)} ({foundDig}/{foundmDNSLookup}/{foundNsLookup}/{foundNbtLookup})'] )
mylog('verbose', [f'[Update Device Name] Names Not Found : {notFound}'] )
# update not found devices with (name not found)

View File

@@ -695,89 +695,9 @@ def resolve_device_name_dig (pMAC, pIP):
#-------------------------------------------------------------------------------
# DNS record (Pholus/Name resolution) cleanup methods
# DNS record (Name resolution) cleanup methods
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
# Disclaimer - I'm interfacing with a script I didn't write (pholus3.py) so it's possible I'm missing types of answers
# it's also possible the pholus3.py script can be adjusted to provide a better output to interface with it
# Hit me with a PR if you know how! :)
def resolve_device_name_pholus (pMAC, pIP, allRes, nameNotFound, match_IP = False):
pholusMatchesIndexes = []
result = nameNotFound
# Collect all Pholus entries with matching MAC and of type Answer
index = 0
for result in allRes:
# limiting entries used for name resolution to the ones containing the current IP (v4 only)
if ((match_IP and result["IP_v4_or_v6"] == pIP ) or ( result["MAC"] == pMAC )) and result["Record_Type"] == "Answer" and '._googlezone' not in result["Value"]:
# found entries with a matching MAC address, let's collect indexes
pholusMatchesIndexes.append(index)
index += 1
# return if nothing found
if len(pholusMatchesIndexes) == 0:
return nameNotFound
# we have some entries let's try to select the most useful one
# Do I need to pre-order allRes to have the most valuable onse on the top?
for i in pholusMatchesIndexes:
if not checkIPV4(allRes[i]['IP_v4_or_v6']):
continue
value = allRes[i]["Value"]
# airplay matches contain a lot of information
# Matches for example:
# Brand Tv (50)._airplay._tcp.local. TXT Class:32769 "acl=0 deviceid=66:66:66:66:66:66 features=0x77777,0x38BCB46 rsf=0x3 fv=p20.T-FFFFFF-03.1 flags=0x204 model=XXXX manufacturer=Brand serialNumber=XXXXXXXXXXX protovers=1.1 srcvers=777.77.77 pi=FF:FF:FF:FF:FF:FF psi=00000000-0000-0000-0000-FFFFFFFFFF gid=00000000-0000-0000-0000-FFFFFFFFFF gcgl=0 pk=AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
if '._airplay._tcp.local. TXT Class:32769' in value:
return cleanDeviceName(value.split('._airplay._tcp.local. TXT Class:32769')[0], match_IP)
# second best - contains airplay
# Matches for example:
# _airplay._tcp.local. PTR Class:IN "Brand Tv (50)._airplay._tcp.local."
if '_airplay._tcp.local. PTR Class:IN' in value and ('._googlecast') not in value:
return cleanDeviceName(value.split('"')[1], match_IP)
# Contains PTR Class:32769
# Matches for example:
# 3.1.168.192.in-addr.arpa. PTR Class:32769 "MyPc.local."
if 'PTR Class:32769' in value:
return cleanDeviceName(value.split('"')[1], match_IP)
# Contains AAAA Class:IN
# Matches for example:
# DESKTOP-SOMEID.local. AAAA Class:IN "fe80::fe80:fe80:fe80:fe80"
if 'AAAA Class:IN' in value:
return cleanDeviceName(value.split('.local.')[0], match_IP)
# Contains _googlecast._tcp.local. PTR Class:IN
# Matches for example:
# _googlecast._tcp.local. PTR Class:IN "Nest-Audio-ff77ff77ff77ff77ff77ff77ff77ff77._googlecast._tcp.local."
if '_googlecast._tcp.local. PTR Class:IN' in value and ('Google-Cast-Group') not in value:
return cleanDeviceName(value.split('"')[1], match_IP)
# Contains A Class:32769
# Matches for example:
# Android.local. A Class:32769 "192.168.1.6"
if ' A Class:32769' in value:
return cleanDeviceName(value.split(' A Class:32769')[0], match_IP)
# Contains PTR Class:IN
# Matches for example:
# _esphomelib._tcp.local. PTR Class:IN "ceiling-light-1._esphomelib._tcp.local."
if 'PTR Class:IN' in value and len(value.split('"')) > 1:
return cleanDeviceName(value.split('"')[1], match_IP)
return nameNotFound
#-------------------------------------------------------------------------------
import dns.resolver
def cleanDeviceName(str, match_IP):