#!/usr/bin/env python # #------------------------------------------------------------------------------- # Pi.Alert v2.70 / 2021-02-01 # Open Source Network Guard / WIFI & LAN intrusion detector # # pialert.py - Back module. Network scanner #------------------------------------------------------------------------------- # Puche 2021 / 2022+ jokob jokob@duck.com GNU GPLv3 #------------------------------------------------------------------------------- #=============================================================================== # IMPORTS #=============================================================================== from __future__ import print_function import sys from collections import namedtuple import subprocess import os import re import time import decimal import datetime from datetime import timedelta import sqlite3 import socket import io import smtplib import csv import json import requests import threading from pathlib import Path from cron_converter import Cron from json2table import convert import hashlib import multiprocessing # pialert modules from const import * from conf import * # from config import DIG_GET_IP_ARG, ENABLE_PLUGINS from logger import append_line_to_file, mylog, print_log, logResult from helper import checkIPV4, filePermissions, importConfigs, isNewVersion, removeDuplicateNewLines, timeNow, write_file from database import * from internet import check_IP_format, check_internet_IP, get_internet_IP from api import update_api from files import get_file_content from mqtt import mqtt_start from pialert.arpscan import execute_arpscan from pialert.mac_vendor import query_MAC_vendor, update_devices_MAC_vendors from pialert.networkscan import scan_network from pialert.nmapscan import performNmapScan from pialert.pholusscan import performPholusScan, resolve_device_name_pholus from pialert.pihole import copy_pihole_network, read_DHCP_leases from pialert.reporting import send_apprise, send_email, send_notifications, send_ntfy, send_pushsafer, send_webhook, skip_repeated_notifications from plugin import execute_plugin, get_plugin_setting, print_plugin_info, run_plugin_scripts # Global variables userSubnets = [] changedPorts_json_struc = None time_started = datetime.datetime.now() cron_instance = Cron() log_timestamp = time_started lastTimeImported = 0 sql_connection = None #=============================================================================== #=============================================================================== # MAIN #=============================================================================== #=============================================================================== cycle = "" check_report = [1, "internet_IP", "update_vendors_silent"] plugins_once_run = False # timestamps of last execution times startTime = time_started now_minus_24h = time_started - datetime.timedelta(hours = 24) last_network_scan = now_minus_24h last_internet_IP_scan = now_minus_24h last_run = now_minus_24h last_cleanup = now_minus_24h last_update_vendors = time_started - datetime.timedelta(days = 6) # update vendors 24h after first run and then once a week # indicates, if a new version is available newVersionAvailable = False def main (): # Initialize global variables global time_started, cycle, last_network_scan, last_internet_IP_scan, last_run, last_cleanup, last_update_vendors # second set of global variables global startTime, log_timestamp, plugins_once_run # To-Do all these DB Globals need to be removed global db, sql, sql_connection # check file permissions and fix if required filePermissions() # Open DB once and keep open # Opening / closing DB frequently actually casues more issues db = DB() # instance of class DB db.openDB() # To-Do replace the following to lines with the db class sql_connection = db.sql_connection sql = db.sql # Upgrade DB if needed upgradeDB(db) #=============================================================================== # This is the main loop of Pi.Alert #=============================================================================== while True: # update time started time_started = datetime.datetime.now() mylog('debug', ['[', timeNow(), '] [MAIN] Stating loop']) # re-load user configuration and plugins importConfigs(db) # Handle plugins executed ONCE if ENABLE_PLUGINS and plugins_once_run == False: run_plugin_scripts(db, 'once') plugins_once_run = True # check if there is a front end initiated event which needs to be executed check_and_run_event(db) # Update API endpoints update_api() # proceed if 1 minute passed if last_run + datetime.timedelta(minutes=1) < time_started : # last time any scan or maintenance/upkeep was run last_run = time_started # Header updateState(db,"Process: Start") # Timestamp startTime = time_started startTime = startTime.replace (microsecond=0) # Check if any plugins need to run on schedule if ENABLE_PLUGINS: run_plugin_scripts(db,'schedule') # determine run/scan type based on passed time # -------------------------------------------- # check for changes in Internet IP if last_internet_IP_scan + datetime.timedelta(minutes=3) < time_started: cycle = 'internet_IP' last_internet_IP_scan = time_started check_internet_IP(db,DIG_GET_IP_ARG) # Update vendors once a week if last_update_vendors + datetime.timedelta(days = 7) < time_started: last_update_vendors = time_started cycle = 'update_vendors' mylog('verbose', ['[', timeNow(), '] cycle:',cycle]) update_devices_MAC_vendors() # Execute scheduled or one-off Pholus scan if enabled and run conditions fulfilled if PHOLUS_RUN == "schedule" or PHOLUS_RUN == "once": pholusSchedule = [sch for sch in mySchedules if sch.service == "pholus"][0] run = False # run once after application starts if PHOLUS_RUN == "once" and pholusSchedule.last_run == 0: run = True # run if overdue scheduled time if PHOLUS_RUN == "schedule": run = pholusSchedule.runScheduleCheck() if run: pholusSchedule.last_run = datetime.datetime.now(tz).replace(microsecond=0) performPholusScan(db, PHOLUS_RUN_TIMEOUT, userSubnets) # Execute scheduled or one-off Nmap scan if enabled and run conditions fulfilled if NMAP_RUN == "schedule" or NMAP_RUN == "once": nmapSchedule = [sch for sch in mySchedules if sch.service == "nmap"][0] run = False # run once after application starts if NMAP_RUN == "once" and nmapSchedule.last_run == 0: run = True # run if overdue scheduled time if NMAP_RUN == "schedule": run = nmapSchedule.runScheduleCheck() if run: nmapSchedule.last_run = datetime.datetime.now(tz).replace(microsecond=0) performNmapScan(db, get_all_devices(db)) # Perform a network scan via arp-scan or pihole if last_network_scan + datetime.timedelta(minutes=SCAN_CYCLE_MINUTES) < time_started: last_network_scan = time_started cycle = 1 # network scan mylog('verbose', ['[', timeNow(), '] cycle:',cycle]) # scan_network() # DEBUG start ++++++++++++++++++++++++++++++++++++++++++++++++++++++ # Start scan_network as a process p = multiprocessing.Process(target=scan_network) p.start() # Wait for 3600 seconds (max 1h) or until process finishes p.join(3600) # If thread is still active if p.is_alive(): print("DEBUG scan_network running too long - let\'s kill it") mylog('info', [' DEBUG scan_network running too long - let\'s kill it']) # Terminate - may not work if process is stuck for good p.terminate() # OR Kill - will work for sure, no chance for process to finish nicely however # p.kill() p.join() # DEBUG end ++++++++++++++++++++++++++++++++++++++++++++++++++++++ # Reporting if cycle in check_report: # Check if new devices found sql.execute (sql_new_devices) newDevices = sql.fetchall() db.commitDB() # new devices were found if len(newDevices) > 0: # run all plugins registered to be run when new devices are found if ENABLE_PLUGINS: run_plugin_scripts(db, 'on_new_device') # Scan newly found devices with Nmap if enabled if NMAP_ACTIVE and len(newDevices) > 0: performNmapScan( db, newDevices) # send all configured notifications send_notifications(db) # clean up the DB once a day if last_cleanup + datetime.timedelta(hours = 24) < time_started: last_cleanup = time_started cycle = 'cleanup' mylog('verbose', ['[', timeNow(), '] cycle:',cycle]) db.cleanup_database(startTime, DAYS_TO_KEEP_EVENTS, PHOLUS_DAYS_DATA) # Commit SQL db.commitDB() # Final message if cycle != "": action = str(cycle) if action == "1": action = "network_scan" mylog('verbose', ['[', timeNow(), '] Last action: ', action]) cycle = "" mylog('verbose', ['[', timeNow(), '] cycle:',cycle]) # Footer updateState(db,"Process: Wait") mylog('verbose', ['[', timeNow(), '] Process: Wait']) else: # do something cycle = "" mylog('verbose', ['[', timeNow(), '] [MAIN] waiting to start next loop']) #loop time.sleep(5) # wait for N seconds #=============================================================================== # UTIL #=============================================================================== #------------------------------------------------------------------------------- def check_and_run_event(db): sql.execute(""" select * from Parameters where par_ID = "Front_Event" """) rows = sql.fetchall() event, param = ['',''] if len(rows) > 0 and rows[0]['par_Value'] != 'finished': event = rows[0]['par_Value'].split('|')[0] param = rows[0]['par_Value'].split('|')[1] else: return if event == 'test': handle_test(param) if event == 'run': handle_run(param) # clear event execution flag sql.execute ("UPDATE Parameters SET par_Value='finished' WHERE par_ID='Front_Event'") # commit to DB db.commitDB() #------------------------------------------------------------------------------- def handle_run(runType): global last_network_scan mylog('info', ['[', timeNow(), '] START Run: ', runType]) if runType == 'ENABLE_ARPSCAN': last_network_scan = now_minus_24h mylog('info', ['[', timeNow(), '] END Run: ', runType]) #------------------------------------------------------------------------------- def handle_test(testType): mylog('info', ['[', timeNow(), '] START Test: ', testType]) # Open text sample sample_txt = get_file_content(pialertPath + '/back/report_sample.txt') # Open html sample sample_html = get_file_content(pialertPath + '/back/report_sample.html') # Open json sample and get only the payload part sample_json_payload = json.loads(get_file_content(pialertPath + '/back/webhook_json_sample.json'))[0]["body"]["attachments"][0]["text"] if testType == 'REPORT_MAIL': send_email(sample_txt, sample_html) if testType == 'REPORT_WEBHOOK': send_webhook (sample_json_payload, sample_txt) if testType == 'REPORT_APPRISE': send_apprise (sample_html, sample_txt) if testType == 'REPORT_NTFY': send_ntfy (sample_txt) if testType == 'REPORT_PUSHSAFER': send_pushsafer (sample_txt) mylog('info', ['[', timeNow(), '] END Test: ', testType]) #------------------------------------------------------------------------------- #------------------------------------------------------------------------------- # Plugins #------------------------------------------------------------------------------- #=============================================================================== # BEGIN #=============================================================================== if __name__ == '__main__': sys.exit(main())