OMDSDNOPENAPI cleanup + opensense script

This commit is contained in:
jokob-sk
2025-02-24 12:52:12 +11:00
parent 65a0f90bd8
commit 2a25f38268
7 changed files with 339 additions and 1 deletions

View File

@@ -51,6 +51,7 @@ Device-detecting plugins insert values into the `CurrentScan` database table. T
| `NTFPRCS` | ⚙ | Notification processing | | Yes | Template | [notification_processing](https://github.com/jokob-sk/NetAlertX/tree/main/front/plugins/notification_processing/)|
| `NTFY` | ▶️ | NTFY notifications | | | Script | [_publisher_ntfy](https://github.com/jokob-sk/NetAlertX/tree/main/front/plugins/_publisher_ntfy/) |
| `OMDSDN` | 📥/🆎 | OMADA TP-Link import | 🖧 🔄 | | Script | [omada_sdn_imp](https://github.com/jokob-sk/NetAlertX/tree/main/front/plugins/omada_sdn_imp/) |
| `OMDSDNOPENAPI`| 📥/🆎 | OMADA TP-Link import via OpenAPI | 🖧 | | Script | [omada_sdn_openapi](https://github.com/jokob-sk/NetAlertX/tree/main/front/plugins/omada_sdn_openapi/) |
| `PIHOLE` | 🔍/🆎/📥| Pi-hole device import & sync | | | SQLite DB | [pihole_scan](https://github.com/jokob-sk/NetAlertX/tree/main/front/plugins/pihole_scan/) |
| `PUSHSAFER` | ▶️ | Pushsafer notifications | | | Script | [_publisher_pushsafer](https://github.com/jokob-sk/NetAlertX/tree/main/front/plugins/_publisher_pushsafer/) |
| `PUSHOVER` | ▶️ | Pushover notifications | | | Script | [_publisher_pushover](https://github.com/jokob-sk/NetAlertX/tree/main/front/plugins/_publisher_pushover/) |

View File

@@ -1,5 +1,5 @@
{
"code_name": "omada_sdn_openapi_import",
"code_name": "omada_sdn_openapi",
"unique_prefix": "OMDSDNOPENAPI",
"plugin_type": "device_scanner",
"execution_order" : "Layer_0",

View File

Before

Width:  |  Height:  |  Size: 176 KiB

After

Width:  |  Height:  |  Size: 176 KiB

View File

@@ -0,0 +1,78 @@
# NetAlertX OPNsense DHCP Lease Converter
## Overview
This script retrieves DHCP lease data from an OPNsense firewall over SSH and converts it into the `dnsmasq` lease file format. You can combine it with the `DHCPLLSS` plugin to ingest devices from OPNsense.
## Features
- Connects to OPNsense via SSH to retrieve DHCP lease data.
- Parses active DHCP leases.
- Converts lease data to `dnsmasq` lease format.
- Saves the converted lease file to a specified output location.
- Supports password and key-based SSH authentication.
- Includes a debug mode for troubleshooting.
## Requirements
- Python 3
- `paramiko` library (for SSH connection)
- An OPNsense firewall with SSH access enabled
## Usage
Run the script with the required parameters:
```sh
./script.py --host <OPNsense_IP> --username <SSH_User> --output <Output_File>
```
### Available Options
| Option | Description |
|--------------|-------------|
| `--host` | OPNsense hostname or IP address (Required) |
| `--username` | SSH username (Required) |
| `--password` | SSH password (Optional if using key-based authentication) |
| `--key-file` | Path to SSH private key file (Optional) |
| `--port` | SSH port (Default: 22) |
| `--output` | Output file path for converted lease file (Required) |
| `--debug` | Enable debug logging (Optional) |
### Example Commands
#### Using Password Authentication
```sh
./script.py --host 192.168.1.1 --username admin --password mypassword --output /tmp/dnsmasq.leases
```
#### Using SSH Key Authentication
```sh
./script.py --host 192.168.1.1 --username admin --key-file ~/.ssh/id_rsa --output /tmp/dnsmasq.leases
```
## Output Format
The script generates a `dnsmasq`-formatted lease file with lines structured as:
```
[epoch timestamp] [MAC address] [IP address] [hostname] [client ID]
```
Example:
```sh
1708212000 00:11:22:33:44:55 192.168.1.100 my-device 01:00:11:22:33:44:55
```
## Troubleshooting
- **Connection issues?** Ensure SSH is enabled on the OPNsense device and the correct credentials are used.
- **No lease data?** Verify the DHCP lease file exists at `/var/dhcpd/var/db/dhcpd.leases`.
- **Permission denied?** Ensure your SSH user has the required permissions to access the lease file.
- **Debugging:** Run the script with the `--debug` flag to see more details.
### Other info
- Version: 1.0
- Author: [im-redactd](https://github.com/im-redactd)
- Release Date: 24-Feb-2025
> [!NOTE]
> This is a community supplied script and not maintained.

View File

@@ -0,0 +1,259 @@
#!/usr/bin/env python3
import paramiko
import re
from datetime import datetime
import argparse
import sys
from pathlib import Path
import time
import logging
def setup_logging(debug=False):
"""Configure logging based on debug flag."""
level = logging.DEBUG if debug else logging.INFO
logging.basicConfig(
level=level,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
return logging.getLogger(__name__)
def parse_timestamp(date_str):
"""Convert OPNsense timestamp to Unix epoch time."""
try:
# Format from OPNsense: "1 2025/02/17 20:08:29"
# Remove the leading number and convert
clean_date = ' '.join(date_str.split()[1:])
dt = datetime.strptime(clean_date, '%Y/%m/%d %H:%M:%S')
return int(dt.timestamp())
except Exception as e:
logger.error(f"Failed to parse timestamp: {date_str}")
return None
def get_lease_file(hostname, username, password=None, key_filename=None, port=22, debug=False):
"""Retrieve the lease file content from OPNsense via SSH."""
logger = logging.getLogger(__name__)
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
logger.debug(f"Attempting to connect to {hostname}:{port} as {username}")
ssh.connect(hostname, port=port, username=username,
password=password, key_filename=key_filename)
# Get an interactive shell session
logger.debug("Opening interactive SSH channel")
channel = ssh.invoke_shell()
time.sleep(2) # Wait for the menu to load
if debug:
# Read and log the initial menu
while channel.recv_ready():
initial_output = channel.recv(4096).decode('utf-8')
logger.debug(f"Initial menu output:\n{initial_output}")
# Send '8' to access the shell
logger.debug("Sending option 8 to access shell")
channel.send('8\n')
time.sleep(2) # Wait for shell access
# Send the command to read the lease file
command = 'cat /var/dhcpd/var/db/dhcpd.leases\n'
logger.debug(f"Sending command: {command}")
channel.send(command)
time.sleep(2) # Wait for command execution
# Receive the output
output = ""
while channel.recv_ready():
chunk = channel.recv(4096).decode('utf-8')
output += chunk
if debug:
logger.debug(f"Received chunk:\n{chunk}")
# Clean up the output by removing the command echo and shell prompts
lines = output.split('\n')
# Remove first line (command echo) and any lines containing shell prompts
cleaned_lines = [line for line in lines
if not line.strip().startswith(command.strip())
and not line.strip().endswith('> ')
and not line.strip().endswith('# ')]
cleaned_output = '\n'.join(cleaned_lines)
logger.debug(f"Final cleaned output length: {len(cleaned_output)} characters")
# Exit the shell properly
channel.send('exit\n')
ssh.close()
return cleaned_output
except Exception as e:
logger.error(f"Error during SSH operation: {str(e)}")
raise
def parse_lease_file(lease_content):
"""Parse the DHCP lease file content and return a list of valid leases."""
logger = logging.getLogger(__name__)
leases = []
current_lease = None
for line in lease_content.split('\n'):
line = line.strip()
if not line or line.startswith('root@') or line.startswith('#'):
continue
logger.debug(f"Processing line: {line}")
# Start of a lease block
if line.startswith('lease'):
if current_lease:
leases.append(current_lease)
logger.debug(f"Added lease: {current_lease}")
current_lease = {}
ip = line.split()[1]
current_lease['ip'] = ip
# MAC address
elif 'hardware ethernet' in line:
mac = line.split()[2].rstrip(';')
current_lease['mac'] = mac
# Hostname
elif 'client-hostname' in line:
hostname = line.split('"')[1] if '"' in line else line.split()[1].rstrip(';')
current_lease['hostname'] = hostname
# Lease state
elif line.startswith('binding state '):
state = line.split('binding state')[1].strip().rstrip(';')
current_lease['state'] = state
# End time
elif line.startswith('ends'):
date_str = ' '.join(line.split()[1:]).rstrip(';')
current_lease['ends'] = date_str
# Client ID
elif line.startswith('uid'):
uid = line.split('"')[1] if '"' in line else line.split()[1].rstrip(';')
current_lease['uid'] = uid
# End of lease block
elif line.strip() == '}':
if current_lease:
leases.append(current_lease)
logger.debug(f"Added lease at block end: {current_lease}")
current_lease = None
# Add the last lease if exists
if current_lease:
leases.append(current_lease)
logger.debug(f"Added final lease: {current_lease}")
# Filter only active leases
active_leases = [lease for lease in leases
if lease.get('state') == 'active'
and 'mac' in lease
and 'ip' in lease]
logger.debug(f"Found {len(active_leases)} active leases out of {len(leases)} total leases")
logger.debug("Active leases:")
for lease in active_leases:
logger.debug(f" {lease}")
return active_leases
def convert_to_dnsmasq(leases):
"""Convert leases to dnsmasq lease file format."""
logger = logging.getLogger(__name__)
dnsmasq_lines = []
for lease in leases:
logger.debug(f"Converting lease: {lease}")
if 'mac' in lease and 'ip' in lease:
# Get expiry time as Unix timestamp
expiry = lease.get('ends', '')
if expiry:
expiry_epoch = parse_timestamp(expiry)
if not expiry_epoch:
logger.error(f"Skipping lease due to invalid timestamp: {lease}")
continue
else:
logger.error(f"Skipping lease due to missing expiry time: {lease}")
continue
# Get required fields
mac = lease['mac']
ip = lease['ip']
hostname = lease.get('hostname', '*')
# Format client ID - if not available, use MAC address with '01:' prefix
client_id = lease.get('uid', f"01:{mac}")
# Clean up client ID - remove escape sequences and quotes
client_id = client_id.replace('\\', '').replace('"', '')
if not client_id.startswith('01:'):
client_id = f"01:{mac}"
# Format: [epoch timestamp] [MAC address] [IP address] [hostname] [client ID]
line = f"{expiry_epoch} {mac} {ip} {hostname} {client_id}"
dnsmasq_lines.append(line)
logger.debug(f"Added dnsmasq lease line: {line}")
return dnsmasq_lines
def main():
parser = argparse.ArgumentParser(description='Convert OPNsense DHCP leases to dnsmasq format')
parser.add_argument('--host', required=True, help='OPNsense hostname or IP')
parser.add_argument('--username', required=True, help='SSH username')
parser.add_argument('--password', help='SSH password (if not using key-based auth)')
parser.add_argument('--key-file', help='SSH private key file path')
parser.add_argument('--port', type=int, default=22, help='SSH port (default: 22)')
parser.add_argument('--output', required=True, help='Output file path')
parser.add_argument('--debug', action='store_true', help='Enable debug logging')
args = parser.parse_args()
# Setup logging
logger = setup_logging(args.debug)
try:
# Get lease file content
logger.info("Retrieving lease file from OPNsense")
lease_content = get_lease_file(
args.host,
args.username,
password=args.password,
key_filename=args.key_file,
port=args.port,
debug=args.debug
)
# Parse leases
logger.info("Parsing lease file content")
leases = parse_lease_file(lease_content)
# Convert to dnsmasq format
logger.info("Converting to dnsmasq format")
dnsmasq_lines = convert_to_dnsmasq(leases)
# Write output file
output_path = Path(args.output)
output_path.parent.mkdir(parents=True, exist_ok=True)
logger.info(f"Writing output to {args.output}")
with open(output_path, 'w') as f:
f.write('\n'.join(dnsmasq_lines) + '\n')
logger.info(f"Successfully wrote {len(dnsmasq_lines)} entries to {args.output}")
except Exception as e:
logger.error(f"Error: {str(e)}")
sys.exit(1)
if __name__ == '__main__':
main()