mirror of
https://github.com/jokob-sk/NetAlertX.git
synced 2026-04-04 17:21:23 -07:00
freebox plugin version 2
This commit is contained in:
@@ -8,10 +8,11 @@ from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import cast
|
||||
import socket
|
||||
import aiofreepybox
|
||||
from aiofreepybox import Freepybox
|
||||
from aiofreepybox.api.lan import Lan
|
||||
from aiofreepybox.exceptions import NotOpenError, AuthorizationError
|
||||
import freebox_api
|
||||
from freebox_api import Freepybox
|
||||
from freebox_api.api.lan import Lan
|
||||
from freebox_api.api.system import System
|
||||
from freebox_api.exceptions import NotOpenError, AuthorizationError
|
||||
|
||||
# Define the installation path and extend the system path for plugin imports
|
||||
INSTALL_PATH = os.getenv('NETALERTX_APP', '/app')
|
||||
@@ -83,8 +84,7 @@ def map_device_type(type: str):
|
||||
|
||||
async def get_device_data(api_version: int, api_address: str, api_port: int):
|
||||
# ensure existence of db path
|
||||
config_base = Path(os.getenv("NETALERTX_CONFIG", "/data/config"))
|
||||
data_dir = config_base / "freeboxdb"
|
||||
data_dir = Path(os.getenv("NETALERTX_CONFIG", "/data/config")) / "freeboxdb"
|
||||
data_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Instantiate Freepybox class using default application descriptor
|
||||
@@ -93,25 +93,27 @@ async def get_device_data(api_version: int, api_address: str, api_port: int):
|
||||
app_desc={
|
||||
"app_id": "netalertx",
|
||||
"app_name": "NetAlertX",
|
||||
"app_version": aiofreepybox.__version__,
|
||||
"app_version": freebox_api.__version__,
|
||||
"device_name": socket.gethostname(),
|
||||
},
|
||||
api_version="v" + str(api_version),
|
||||
data_dir=data_dir,
|
||||
token_file=data_dir / "token",
|
||||
)
|
||||
|
||||
# Connect to the freebox
|
||||
# Be ready to authorize the application on the Freebox if you run this
|
||||
# for the first time
|
||||
try:
|
||||
await fbx.open(host=api_address, port=api_port)
|
||||
await fbx.open(host=api_address, port=str(api_port))
|
||||
except NotOpenError as e:
|
||||
mylog("verbose", [f"[{pluginName}] Error connecting to freebox: {e}"])
|
||||
return (),()
|
||||
except AuthorizationError as e:
|
||||
mylog("verbose", [f"[{pluginName}] Auth error: {str(e)}"])
|
||||
return (),()
|
||||
|
||||
# get also info of the freebox itself
|
||||
config = await fbx.system.get_config()
|
||||
config = await cast(System, fbx.system).get_config()
|
||||
freebox = await cast(Lan, fbx.lan).get_config()
|
||||
hosts = await cast(Lan, fbx.lan).get_hosts_list()
|
||||
assert config is not None
|
||||
@@ -146,14 +148,14 @@ def main():
|
||||
mylog("verbose", [hosts])
|
||||
|
||||
plugin_objects.add_object(
|
||||
primaryId=freebox["mac"],
|
||||
secondaryId=freebox["ip"],
|
||||
watched1=freebox["name"],
|
||||
watched2=freebox["operator"],
|
||||
primaryId=freebox["mac"], # type: ignore
|
||||
secondaryId=freebox["ip"], # type: ignore
|
||||
watched1=freebox["name"], # type: ignore
|
||||
watched2=freebox["operator"], # type: ignore
|
||||
watched3="Gateway",
|
||||
watched4=timeNowUTC(),
|
||||
extra="",
|
||||
foreignKey=freebox["mac"],
|
||||
foreignKey=freebox["mac"], # type: ignore
|
||||
)
|
||||
for host in hosts:
|
||||
# Check if 'l3connectivities' exists and is a list
|
||||
@@ -175,7 +177,7 @@ def main():
|
||||
# Optional: Log or handle hosts without 'l3connectivities'
|
||||
mylog("verbose", [f"[{pluginName}] Host missing 'l3connectivities': {host}"])
|
||||
|
||||
# commit result
|
||||
# Commit result
|
||||
plugin_objects.write_result_file()
|
||||
|
||||
return 0
|
||||
|
||||
Reference in New Issue
Block a user