PIHOLE to plugin rewrite

This commit is contained in:
Jokob-sk
2023-08-08 08:05:42 +10:00
parent c9f4560cf9
commit 22bfb9deef
2 changed files with 5 additions and 241 deletions

View File

@@ -1,27 +1,15 @@
## Overview
A plugin allowing for importing Un-Discoverable devices from the settings page.
The main usecase is to add dumb network gear like unmanaged hubs and switches to the network view.
There might be other usecases, please let me know.
A plugin allowing for importing devices from the PiHole database. This is an import plugin using an SQLite database as a source.
### Usage
- Go to settings and find Un-Discoverabe Devices in the list of plugins.
- Enable the plugin by changing the RUN parameter from disabled to `once` or `always_after_scan`.
- Add the name of your device to the list. (remove the sample entry first)
- SAVE
- wait for the next scan to finish
- TBD
#### Examples:
Settings:
![settings](https://github.com/Data-Monkey/Pi.Alert/assets/7224371/52883307-19a5-4602-b13a-9825461f6cc4)
resulting in these devices:
![devices](https://github.com/Data-Monkey/Pi.Alert/assets/7224371/9f7659e7-75a8-4ae9-9f5f-781bdbcbc949)
Allowing Un-Discoverable devices like hubs, switches or APs to be added to the network view.
![network](https://github.com/Data-Monkey/Pi.Alert/assets/7224371/b5ccc3b3-f5fd-4f5b-b0f0-e4e637c6da33)
TBD
### Known Limitations
- Un-Discoverable Devices always show as offline. That is expected as they can not be discovered by Pi.Alert.
- All IPs are set to 0.0.0.0 therefore the "Random MAC" icon might show up.
- TBD

View File

@@ -1,224 +0,0 @@
#!/usr/bin/env python
import sqlite3
import os
import pathlib
import argparse
import sys
import re
import base64
import subprocess
from time import strftime
sys.path.append("/home/pi/pialert/front/plugins")
from plugin_helper import Plugin_Object, Plugin_Objects
""" module to import db and leases from PiHole """
piholeDB = '/etc/pihole/pihole-FTL.db'
pialertPath = '/home/pi/pialert'
dbPath = '/db/pialert.db'
fullPiAlertDbPath = pialertPath + dbPath
CUR_PATH = str(pathlib.Path(__file__).parent.resolve())
LOG_FILE = os.path.join(CUR_PATH, 'script.log')
RESULT_FILE = os.path.join(CUR_PATH, 'last_result.log')
def main():
fullPiAlertDbPath
# # sample
# # /home/pi/pialert/front/plugins/arp_scan/script.py userSubnets=b'MTkyLjE2OC4xLjAvMjQgLS1pbnRlcmZhY2U9ZXRoMQ=='
# # the script expects a parameter in the format of userSubnets=subnet1,subnet2,...
# parser = argparse.ArgumentParser(description='Import devices from settings')
# parser.add_argument('userSubnets', nargs='+', help="list of subnets with options")
# values = parser.parse_args()
# # Assuming Plugin_Objects is a class or function that reads data from the RESULT_FILE
# # and returns a list of objects called 'devices'.
# devices = Plugin_Objects(RESULT_FILE)
# # Print a message to indicate that the script is starting.
# print('In script:')
# # Assuming 'values' is a dictionary or object that contains a key 'userSubnets'
# # which holds a list of user-submitted subnets.
# # Printing the userSubnets list to check its content.
# print(values.userSubnets)
# # Extract the base64-encoded subnet information from the first element of the userSubnets list.
# # The format of the element is assumed to be like 'userSubnets=b<base64-encoded-data>'.
# userSubnetsParamBase64 = values.userSubnets[0].split('userSubnets=b')[1]
# # Printing the extracted base64-encoded subnet information.
# print(userSubnetsParamBase64)
# # Decode the base64-encoded subnet information to get the actual subnet information in ASCII format.
# userSubnetsParam = base64.b64decode(userSubnetsParamBase64).decode('ascii')
# # Print the decoded subnet information.
# print('userSubnetsParam:')
# print(userSubnetsParam)
# # Check if the decoded subnet information contains multiple subnets separated by commas.
# # If it does, split the string into a list of individual subnets.
# # Otherwise, create a list with a single element containing the subnet information.
# if ',' in userSubnetsParam:
# subnets_list = userSubnetsParam.split(',')
# else:
# subnets_list = [userSubnetsParam]
# # Execute the ARP scanning process on the list of subnets (whether it's one or multiple subnets).
# # The function 'execute_arpscan' is assumed to be defined elsewhere in the code.
# unique_devices = execute_arpscan(subnets_list)
# for device in unique_devices:
# devices.add_object(
# primaryId=device['mac'], # MAC (Device Name)
# secondaryId=device['ip'], # IP Address
# watched1=device['ip'], # Device Name
# watched2=device.get('hw', ''), # Vendor (assuming it's in the 'hw' field)
# watched3=device.get('interface', ''), # Add the interface
# watched4='',
# extra='arp-scan',
# foreignKey="")
# devices.write_result_file()
# return 0
def execute_arpscan(userSubnets):
# output of possible multiple interfaces
arpscan_output = ""
devices_list = []
# scan each interface
for interface in userSubnets :
arpscan_output = execute_arpscan_on_interface (interface)
print(arpscan_output)
# Search IP + MAC + Vendor as regular expresion
re_ip = r'(?P<ip>((2[0-5]|1[0-9]|[0-9])?[0-9]\.){3}((2[0-5]|1[0-9]|[0-9])?[0-9]))'
re_mac = r'(?P<mac>([0-9a-fA-F]{2}[:-]){5}([0-9a-fA-F]{2}))'
re_hw = r'(?P<hw>.*)'
re_pattern = re.compile (re_ip + '\s+' + re_mac + '\s' + re_hw)
devices_list_tmp = [
{**device.groupdict(), "interface": interface}
for device in re.finditer(re_pattern, arpscan_output)
]
devices_list += devices_list_tmp
# mylog('debug', ['[ARP Scan] Found: Devices including duplicates ', len(devices_list) ])
# Delete duplicate MAC
unique_mac = []
unique_devices = []
for device in devices_list :
if device['mac'] not in unique_mac:
unique_mac.append(device['mac'])
unique_devices.append(device)
# return list
# mylog('debug', ['[ARP Scan] Found: Devices without duplicates ', len(unique_devices) ])
print("Devices List len:", len(devices_list)) # Add this line to print devices_list
print("Devices List:", devices_list) # Add this line to print devices_list
return devices_list
def execute_arpscan_on_interface(interface):
# Prepare command arguments
arpscan_args = ['sudo', 'arp-scan', '--ignoredups', '--retry=6'] + interface.split()
# Execute command
try:
# try running a subprocess safely
result = subprocess.check_output(arpscan_args, universal_newlines=True)
except subprocess.CalledProcessError as e:
# An error occurred, handle it
error_type = type(e).__name__ # Capture the error type
result = ""
return result
#-------------------------------------------------------------------------------
def copy_pihole_network (db):
"""
attach the PiHole Database and copy the PiHole_Network table accross into the PiAlert DB
"""
sql = db.sql # TO-DO
# Open Pi-hole DB
print('[PiHole Network] - attach PiHole DB')
try:
sql.execute ("ATTACH DATABASE '"+ piholeDB +"' AS PH")
except sqlite3.Error as e:
print(f'[PiHole Network] - SQL ERROR: {e}')
# Copy Pi-hole Network table
try:
sql.execute ("DELETE FROM PiHole_Network")
# just for reporting
new_devices = []
sql.execute ( """SELECT hwaddr, macVendor, lastQuery,
(SELECT name FROM PH.network_addresses
WHERE network_id = id ORDER BY lastseen DESC, ip),
(SELECT ip FROM PH.network_addresses
WHERE network_id = id ORDER BY lastseen DESC, ip)
FROM PH.network
WHERE hwaddr NOT LIKE 'ip-%'
AND hwaddr <> '00:00:00:00:00:00' """)
new_devices = sql.fetchall()
# insert into PiAlert DB
sql.execute ("""INSERT INTO PiHole_Network (PH_MAC, PH_Vendor, PH_LastQuery,
PH_Name, PH_IP)
SELECT hwaddr, macVendor, lastQuery,
(SELECT name FROM PH.network_addresses
WHERE network_id = id ORDER BY lastseen DESC, ip),
(SELECT ip FROM PH.network_addresses
WHERE network_id = id ORDER BY lastseen DESC, ip)
FROM PH.network
WHERE hwaddr NOT LIKE 'ip-%'
AND hwaddr <> '00:00:00:00:00:00' """)
sql.execute ("""UPDATE PiHole_Network SET PH_Name = '(unknown)'
WHERE PH_Name IS NULL OR PH_Name = '' """)
# Close Pi-hole DB
sql.execute ("DETACH PH")
except sqlite3.Error as e:
print(f'[PiHole Network] - SQL ERROR: {e}')
db.commitDB()
print('[PiHole Network] - completed - found ', len(new_devices), ' devices')
return str(sql.rowcount) != "0"
#-------------------------------------------------------------------------------
#===============================================================================
# BEGIN
#===============================================================================
if __name__ == '__main__':
main()