Files
NetAlertX/front/plugins/omada_sdn_imp/testre.py
2024-07-17 17:58:43 -04:00

195 lines
5.7 KiB
Python

import re
""""
how to rebuild and re-run...
savefolder=~/naxdev/NetAlertX.v7
cd ~/naxdev
mv NetAlertX $savefolder
gh repo clone FlyingToto/NetAlertX
cd NetAlertX
ln -s ../docker-compose.yml.ffsb42 .
ln -s ../.env.omada.ffsb42 .
cd front/plugins/omada_sdn_imp/
cp -p $savefoder/front/plugins/omada_sdn_imp/omada_sdn.py* .
cp -p $savefoder/front/plugins/omada_sdn_imp/README.md .
cp -p $savefoder/front/plugins/omada_sdn_imp/omada_account_sample.png .
cp -p $savefoder/front/plugins/omada_sdn_imp/testre.py .
#cp -p $savefoder/front/plugins/omada_sdn_imp/config.json config.json.v6
cd ~/naxdev/NetAlertX
sudo docker-compose --env-file .env.omada.ffsb42 -f ./docker-compose.yml.ffsb42 up
to gather data for Boris:
today=$(date +%Y_%m_%d__%H_%M)
mkdir /drives/c/temp/4boris/$today
cd /drives/c/temp/4boris/$today
scp hal:~/naxdev/logs/app.log .
scp hal:~/naxdev/NetAlertX/front/plugins/omada_sdn_imp/* .
gzip -c app.log > app_$today.log.gz
scp hal:~/naxdev/NetAlertX/front/plugins/omada_sdn_imp/omada_sdn.py /drives/c/temp/4boris/
"""
def extract_mac_addresses(text):
mac_pattern = r"([0-9A-Fa-f]{2}[:-][0-9A-Fa-f]{2}[:-][0-9A-Fa-f]{2}[:-][0-9A-Fa-f]{2}[:-][0-9A-Fa-f]{2}[:-][0-9A-Fa-f]{2})"
#mac_pattern = r'([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})'
#r"(([0-9A-F]{2}-){5}[0-9A-F]{2})"
#r"([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})"
#r"([0-9A-Fa-f]{2}[:-][0-9A-Fa-f]{2}[:-][0-9A-Fa-f]{2}[:-][0-9A-Fa-f]{2}[:-][0-9A-Fa-f]{2}[:-][0-9A-Fa-f]{2})"
mac_addresses = re.findall(mac_pattern, text)
return ["".join(parts) for parts in mac_addresses]
# Example usage:
foo = """
Name: office
Address: 0C-80-63-69-C4-D1 (192.168.0.5)
Status: CONNECTED (CONNECTED)
Ports: 28
Supports PoE: False
Model: T1600G-28TS v3.0
LED Setting: SITE_SETTINGS
Uptime: 5day(s) 22h 39m 6s
Uplink switch: D8-07-B6-71-FF-7F office24
Downlink devices:
- 40-AE-30-A5-A7-50 ompapaoffice
- B0-95-75-46-0C-39 pantry12
"""
mac_list = extract_mac_addresses(foo)
print("mac list",mac_list)
# ['0C-80-63-69-C4-D1', 'D8-07-B6-71-FF-7F', '40-AE-30-A5-A7-50', 'B0-95-75-46-0C-39']
# ['C4-:D1', 'FF-:7F', 'A7-:50', '0C-:39']
linked_switches_and_ports_by_mac = {}
foo = """"
something
some BOB12
blah BOB23
--- BEGIN ---
something else BOB12
blah BOB23
--- END ---
"""
def extract_BOB_patterns(foo):
pattern = r"BOB\d{2}(?=.*BEGIN)"
matches = re.findall(pattern, foo, re.DOTALL)
return matches
BOBresult = extract_BOB_patterns(foo)
print("BOB:",BOBresult) # Output: ['BOB12', 'BOB23']
#0C-80-63-69-C4-D1
clientmac_by_switchmac_by_switchportSSID = {}
switch_mac_and_ports_by_clientmac = {}
def extract_uplinks_mac_and_ports(tplink_device_dump):
mac_switches = []
mac_pattern = r"([0-9A-Fa-f]{2}[:-][0-9A-Fa-f]{2}[:-][0-9A-Fa-f]{2}[:-][0-9A-Fa-f]{2}[:-][0-9A-Fa-f]{2}[:-][0-9A-Fa-f]{2})(?=.*BEGIN)"
mac_addresses = re.findall(mac_pattern, tplink_device_dump,re.DOTALL)
mac_switches = ["".join(parts) for parts in mac_addresses]
print(" mac_switches1=",mac_switches)
mymac = mac_switches[0]
mylinks = mac_switches[1:]
for mylink in mylinks:
port_pattern = r"(?=\{.*\"port\"\: )([0-9]+)(?=.*"+re.escape(mylink)+r")"
port_pattern = r"(?:{/s\"port\"\: )([0-9]+)(?:[!\}].*"+re.escape(mylink)+r")"
#port_pattern = rf"{{.*?{found_mac}.*?port\s*:\s*(\d+).*?}}"
#port_pattern = rf"{{.*?.*?port\s*:\s*(\d+)[!\\}]*{mylink}?}}"
port_pattern = r"(?:\{[!\}]port/s:/s)([0-9]+\,)(?:[!\}]*"+re.escape(mylink)+r"[!\{]*\})"
#port_pattern = r"(?:\{.*\"port\"\: )([0-9]+)(?=.*"+re.escape(mylink)+r")"
port_pattern = r"(?:{[^}]*\"port\"\: )([0-9]+)(?=[^}]*"+re.escape(mylink)+r")"
myport = re.findall(port_pattern, tplink_device_dump,re.DOTALL)
print("myswitch=",mymac, "- link_switch=", mylink, "myport=", myport)
return(0)
'''
with open('/tmp/switch.bigroom.dump.json', 'r') as file:
foo3 = file_content = file.read()
print("bigroom", end="")
extract_uplinks_mac_and_ports(foo3)
with open('/tmp/switch.office.dump.json', 'r') as file:
foo4 = file_content = file.read()
print("office", end="")
extract_uplinks_mac_and_ports(foo4)
'''
import netifaces
gw = netifaces.gateways()
print(gw['default'][netifaces.AF_INET][0])
d = {'a': ['0', 'Arthur'], 'b': ['foo', 'Belling']}
print(d.items())
print(d.keys())
print(d.values())
foo = 2
#while foo > 0:
# foo = 'toto'
print("foo is ",foo)
if foo in ( 'bar', '', 'null'):
print("foo is bar")
else:
print("foo is not bar")
foo='192-168-0-150.local'
bar = foo.split('.')[0]
print("bar=",bar,"-")
bar2 = 'toto'
print("bar2=",bar2,"-")
import concurrent.futures
import time
import random
def phello(arg):
print('running phell',arg)
delay = random.uniform(0, 6)
time.sleep(delay)
return f"parallel hello : {arg}", delay
def testparalel():
arguments = ["Alice", "Bob", "Charlie", "David"]
results = {}
results2 = {}
para = 10
# Using ThreadPoolExecutor for parallel execution
with concurrent.futures.ThreadPoolExecutor(max_workers=para) as executor:
# Submit tasks to the executor
future_to_arg = {executor.submit(phello, arg): arg for arg in arguments}
# Wait for all futures to complete
done, _ = concurrent.futures.wait(future_to_arg)
# Retrieve results
for future in done:
arg = future_to_arg[future]
try:
result, result2 = future.result()
results[arg] = result
results2[arg] = result2
except Exception as exc:
print(f"{arg} generated an exception: {exc}")
# Print results after all threads have completed
print("All threads completed. Results:")
for arg, result in results.items():
print(f"arg:{arg}, result={results[arg]}, result2={results2[arg]}")
testparalel()