Files
NetAlertX/front/plugins/pihole_api_scan/pihole_api_scan.py
jokob-sk 5c14b34a8b BE: linting fixes
Signed-off-by: jokob-sk <jokob.sk@gmail.com>
2025-11-22 13:14:06 +11:00

299 lines
10 KiB
Python

# !/usr/bin/env python
"""
NetAlertX plugin: PIHOLEAPI
Imports devices from Pi-hole v6 API (Network endpoints) into NetAlertX plugin results.
"""
import os
import sys
import datetime
import requests
import json
from requests.packages.urllib3.exceptions import InsecureRequestWarning
# --- NetAlertX plugin bootstrap (match example) ---
INSTALL_PATH = os.getenv('NETALERTX_APP', '/app')
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
pluginName = 'PIHOLEAPI'
from plugin_helper import Plugin_Objects, is_mac # noqa: E402 [flake8 lint suppression]
from logger import mylog, Logger # noqa: E402 [flake8 lint suppression]
from helper import get_setting_value # noqa: E402 [flake8 lint suppression]
from const import logPath # noqa: E402 [flake8 lint suppression]
import conf # noqa: E402 [flake8 lint suppression]
from pytz import timezone # noqa: E402 [flake8 lint suppression]
# Setup timezone & logger using standard NAX helpers
conf.tz = timezone(get_setting_value('TIMEZONE'))
Logger(get_setting_value('LOG_LEVEL'))
LOG_PATH = logPath + '/plugins'
RESULT_FILE = os.path.join(LOG_PATH, f'last_result.{pluginName}.log')
plugin_objects = Plugin_Objects(RESULT_FILE)
# --- Global state for session ---
PIHOLEAPI_URL = None
PIHOLEAPI_PASSWORD = None
PIHOLEAPI_SES_VALID = False
PIHOLEAPI_SES_SID = None
PIHOLEAPI_SES_CSRF = None
PIHOLEAPI_API_MAXCLIENTS = None
PIHOLEAPI_VERIFY_SSL = True
PIHOLEAPI_RUN_TIMEOUT = 10
VERSION_DATE = "NAX-PIHOLEAPI-1.0"
# ------------------------------------------------------------------
def pihole_api_auth():
"""Authenticate to Pi-hole v6 API and populate session globals."""
global PIHOLEAPI_SES_VALID, PIHOLEAPI_SES_SID, PIHOLEAPI_SES_CSRF
if not PIHOLEAPI_URL:
mylog('none', [f'[{pluginName}] PIHOLEAPI_URL not configured — skipping.'])
return False
# handle SSL verification setting - disable insecure warnings only when PIHOLEAPI_VERIFY_SSL=False
if not PIHOLEAPI_VERIFY_SSL:
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
headers = {
"accept": "application/json",
"content-type": "application/json",
"User-Agent": "NetAlertX/" + VERSION_DATE
}
data = {"password": PIHOLEAPI_PASSWORD}
try:
resp = requests.post(PIHOLEAPI_URL + 'api/auth', headers=headers, json=data, verify=PIHOLEAPI_VERIFY_SSL, timeout=PIHOLEAPI_RUN_TIMEOUT)
resp.raise_for_status()
except requests.exceptions.Timeout:
mylog('none', [f'[{pluginName}] Pi-hole auth request timed out. Try increasing PIHOLEAPI_RUN_TIMEOUT.'])
return False
except requests.exceptions.ConnectionError:
mylog('none', [f'[{pluginName}] Connection error during Pi-hole auth. Check PIHOLEAPI_URL and PIHOLEAPI_PASSWORD'])
return False
except Exception as e:
mylog('none', [f'[{pluginName}] Unexpected auth error: {e}'])
return False
try:
response_json = resp.json()
except Exception:
mylog('none', [f'[{pluginName}] Unable to parse Pi-hole auth response JSON.'])
return False
session_data = response_json.get('session', {})
if session_data.get('valid', False):
PIHOLEAPI_SES_VALID = True
PIHOLEAPI_SES_SID = session_data.get('sid')
# csrf might not be present if no password set
PIHOLEAPI_SES_CSRF = session_data.get('csrf')
mylog('verbose', [f'[{pluginName}] Authenticated to Pi-hole (sid present).'])
return True
else:
mylog('none', [f'[{pluginName}] Pi-hole auth required or failed.'])
return False
# ------------------------------------------------------------------
def pihole_api_deauth():
"""Logout from Pi-hole v6 API (best-effort)."""
global PIHOLEAPI_SES_VALID, PIHOLEAPI_SES_SID, PIHOLEAPI_SES_CSRF
if not PIHOLEAPI_URL:
return
if not PIHOLEAPI_SES_SID:
return
headers = {"X-FTL-SID": PIHOLEAPI_SES_SID}
try:
requests.delete(PIHOLEAPI_URL + 'api/auth', headers=headers, verify=PIHOLEAPI_VERIFY_SSL, timeout=PIHOLEAPI_RUN_TIMEOUT)
except Exception:
# ignore errors on logout
pass
PIHOLEAPI_SES_VALID = False
PIHOLEAPI_SES_SID = None
PIHOLEAPI_SES_CSRF = None
# ------------------------------------------------------------------
def get_pihole_interface_data():
"""Return dict mapping mac -> [ipv4 addresses] from Pi-hole interfaces endpoint."""
result = {}
if not PIHOLEAPI_SES_VALID:
return result
headers = {"X-FTL-SID": PIHOLEAPI_SES_SID}
if PIHOLEAPI_SES_CSRF:
headers["X-FTL-CSRF"] = PIHOLEAPI_SES_CSRF
try:
resp = requests.get(PIHOLEAPI_URL + 'api/network/interfaces', headers=headers, verify=PIHOLEAPI_VERIFY_SSL, timeout=PIHOLEAPI_RUN_TIMEOUT)
resp.raise_for_status()
data = resp.json()
except Exception as e:
mylog('none', [f'[{pluginName}] Failed to fetch Pi-hole interfaces: {e}'])
return result
for interface in data.get('interfaces', []):
mac_address = interface.get('address')
if not mac_address or mac_address == "00:00:00:00:00:00":
continue
addrs = []
for addr in interface.get('addresses', []):
if addr.get('family') == 'inet':
a = addr.get('address')
if a:
addrs.append(a)
if addrs:
result[mac_address] = addrs
return result
# ------------------------------------------------------------------
def get_pihole_network_devices():
"""Return list of devices from Pi-hole v6 API (devices endpoint)."""
devices = []
# return empty list if no session available
if not PIHOLEAPI_SES_VALID:
return devices
# prepare headers
headers = {"X-FTL-SID": PIHOLEAPI_SES_SID}
if PIHOLEAPI_SES_CSRF:
headers["X-FTL-CSRF"] = PIHOLEAPI_SES_CSRF
params = {
'max_devices': str(PIHOLEAPI_API_MAXCLIENTS),
'max_addresses': '2'
}
try:
resp = requests.get(PIHOLEAPI_URL + 'api/network/devices', headers=headers, params=params, verify=PIHOLEAPI_VERIFY_SSL, timeout=PIHOLEAPI_RUN_TIMEOUT)
resp.raise_for_status()
data = resp.json()
mylog('debug', [f'[{pluginName}] Pi-hole API returned data: {json.dumps(data)}'])
except Exception as e:
mylog('none', [f'[{pluginName}] Failed to fetch Pi-hole devices: {e}'])
return devices
# The API returns 'devices' list
return data.get('devices', [])
# ------------------------------------------------------------------
def gather_device_entries():
"""
Build a list of device entries suitable for Plugin_Objects.add_object.
Each entry is a dict with: mac, ip, name, macVendor, lastQuery
"""
entries = []
iface_map = get_pihole_interface_data()
devices = get_pihole_network_devices()
now_ts = int(datetime.datetime.now().timestamp())
for device in devices:
hwaddr = device.get('hwaddr')
if not hwaddr or hwaddr == "00:00:00:00:00:00":
continue
macVendor = device.get('macVendor', '')
lastQuery = device.get('lastQuery')
# 'ips' is a list of dicts: {ip, name}
for ip_info in device.get('ips', []):
ip = ip_info.get('ip')
if not ip:
continue
name = ip_info.get('name') or '(unknown)'
# mark active if ip present on local interfaces
for mac, iplist in iface_map.items():
if ip in iplist:
lastQuery = str(now_ts)
entries.append({
'mac': hwaddr.lower(),
'ip': ip,
'name': name,
'macVendor': macVendor,
'lastQuery': str(lastQuery) if lastQuery is not None else ''
})
return entries
# ------------------------------------------------------------------
def main():
"""Main plugin entrypoint."""
global PIHOLEAPI_URL, PIHOLEAPI_PASSWORD, PIHOLEAPI_API_MAXCLIENTS, PIHOLEAPI_VERIFY_SSL, PIHOLEAPI_RUN_TIMEOUT
mylog('verbose', [f'[{pluginName}] start script.'])
# Load settings from NAX config
PIHOLEAPI_URL = get_setting_value('PIHOLEAPI_URL')
# ensure trailing slash
if not PIHOLEAPI_URL.endswith('/'):
PIHOLEAPI_URL += '/'
PIHOLEAPI_PASSWORD = get_setting_value('PIHOLEAPI_PASSWORD')
PIHOLEAPI_API_MAXCLIENTS = get_setting_value('PIHOLEAPI_API_MAXCLIENTS')
# Accept boolean or string "True"/"False"
PIHOLEAPI_VERIFY_SSL = get_setting_value('PIHOLEAPI_SSL_VERIFY')
PIHOLEAPI_RUN_TIMEOUT = get_setting_value('PIHOLEAPI_RUN_TIMEOUT')
# Authenticate
if not pihole_api_auth():
mylog('none', [f'[{pluginName}] Authentication failed — no devices imported.'])
return 1
try:
device_entries = gather_device_entries()
if not device_entries:
mylog('verbose', [f'[{pluginName}] No devices found on Pi-hole.'])
else:
for entry in device_entries:
if is_mac(entry['mac']):
# Map to Plugin_Objects fields
mylog('verbose', [f'[{pluginName}] found: {entry['name']}|{entry['mac']}|{entry['ip']}'])
plugin_objects.add_object(
primaryId=str(entry['mac']),
secondaryId=str(entry['ip']),
watched1=str(entry['name']),
watched2=str(entry['macVendor']),
watched3=str(entry['lastQuery']),
watched4="",
extra=pluginName,
foreignKey=str(entry['mac'])
)
else:
mylog('verbose', [f'[{pluginName}] Skipping invalid MAC: {entry['name']}|{entry['mac']}|{entry['ip']}'])
# Write result file for NetAlertX to ingest
plugin_objects.write_result_file()
mylog('verbose', [f'[{pluginName}] Script finished. Imported {len(device_entries)} entries.'])
finally:
# Deauth best-effort
pihole_api_deauth()
return 0
if __name__ == '__main__':
main()