# ------------------ mypass.json
{
"info":{
"password": "zxc"
}
}
# ------------------ k.py
import os,io,string
import hashlib
from hashlib import sha256
from typing import Dict, List
from urllib import parse
import timeit
import configparser
import json
import logging
import time
from datetime import datetime
import requests
from requests import Session
from rich.live import Live
from rich.table import Table
from rich.panel import Panel
from rich.layout import Layout
from rich import box
# https://i...content-available-to-author-only...e.com/KsRWCW
def file_load(filename,encoding='utf8'):
with io.open(filename,encoding=encoding) as f_in:
# js=json.load(f_in)
data=f_in.read()
f_in.close()
return data
class HiTimer():
def __init__(self):
self.startInterval = timeit.default_timer()
self.endInterval = timeit.default_timer()
def start(self):
self.startInterval = timeit.default_timer()
def elapsedsec(self):
self.endInterval = timeit.default_timer()
timeSt=self.endInterval- self.startInterval
return timeSt
def elapsedms(self):
self.endInterval = timeit.default_timer()
timeSt=(self.endInterval- self.startInterval)*1000.0
return timeSt
class KeeneticClient:
def __init__(self, admin_endpoint: str, skip_auth: bool, login: str, password: str):
self._admin_endpoint = admin_endpoint
self._skip_auth = skip_auth
self._login = login
self._password = password
def __enter__(self):
self._session = Session()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self._session:
self._session.close()
def _auth(self) -> bool:
if self._skip_auth:
return True
auth_endpoint = f"{self._admin_endpoint}/auth"
check_auth_response = self._session.get(auth_endpoint,timeout=2)
if check_auth_response.status_code == 401:
ndm_challenge = check_auth_response.headers.get('X-NDM-Challenge')
ndm_realm = check_auth_response.headers.get('X-NDM-Realm')
md5 = hashlib.md5((self._login + ':' + ndm_realm + ':' + self._password).encode('utf-8')).hexdigest()
sha = sha256((ndm_challenge + md5).encode('utf-8')).hexdigest()
auth_response = self._session.post(auth_endpoint,timeout=2, json={'login': self._login, 'password': sha})
if auth_response.status_code == 200:
return True
raise ConnectionError(f"Keenetic authorisation failed. Status {auth_response.status_code}")
elif check_auth_response.status_code == 200:
return True
raise ConnectionError(f"Failed to check authorisation, status unknown ({check_auth_response.status_code})")
def connect(self) -> bool:
if not self._auth():
raise ConnectionError(f"No keenetic connection.")
return True
def do_post(self, command: str, params: Dict) -> Dict:
if not self._auth():
raise ConnectionError(f"No keenetic connection.")
url = f"{self._admin_endpoint}{command.replace(' ', '/')}"
# prms=parse.urlencode(params)
post_resp = self._session.post( url = url, json=params,timeout=2)
if post_resp.status_code != 200:
raise KeeneticApiException(post_resp.status_code, post_resp.text)
return post_resp.json()
# raise ConnectionError(f"Keenetic post failed. Status {post_resp.status_code}")
# json = [
# {"interface" : {"Dsl0": {"up": {"no": True}}}},
# {"system" : {"configuration": {"save": True}}}
# ]
def do_get(self, command: str, params: Dict) -> Dict:
if not self._auth():
raise ConnectionError(f"No keenetic connection.")
url = f"{self._admin_endpoint}{command.replace(' ', '/')}" + "?" + parse.urlencode(params)
r = self._session.get(url,timeout=2)
if r.status_code != 200:
raise KeeneticApiException(r.status_code, r.text)
return r.json()
class KeeneticApiException(Exception):
def __init__(self, status_code: int, response_text: str):
self.status_code = status_code
self.response_text = response_text
# def ():
def tty_parse_lines(lines :list):
parse_res=[]
has_ok=False
for s in lines:
v=s.split(',')
v=[i.strip() for i in v]
prefix=v[0].split(':')
cmdname=""
cmdfirstv=""
if len(prefix)>1:
cmdname=prefix[0].strip()
cmdfirstv=prefix[1].strip()
info={"src":s,"cmd":cmdname,"first_val":cmdfirstv,"vals":v}
if s=="OK":
has_ok=True
parse_res+=[info]
return parse_res,has_ok
class AvgMeter():
def __init__(self):
self.dload_global_timer=HiTimer()
self.dload_timer=HiTimer()
self.rx_timer=HiTimer()
self.dload_vals=[]
self.dload_global_sum=0
self.dload_global_count=0
self.rxbytes_start=0
self.rxbytes_cur=0
self.rxspeed=0
def set_rxbytes(self,rxbytes):
if self.rxbytes_start==0:
self.rx_timer.start()
self.rxbytes_start=rxbytes
self.rxbytes_cur=self.rxbytes_start
self.rxspeed=0
return
elapsedSec=self.rx_timer.elapsedsec()
if elapsedSec>=1.3:
self.rxbytes_cur=rxbytes
self.rxspeed=self.rxbytes_cur-self.rxbytes_start
self.rxspeed=self.rxspeed/elapsedSec
if elapsedSec>=3:
self.rx_timer.start()
self.rxbytes_start=rxbytes
self.rxbytes_cur=self.rxbytes_start
def get_rx_speed_bytes(self):
return self.rxspeed
def start_download_timer(self):
self.dload_timer.start()
self.dload_vals=[]
def start_download_global_timer(self):
self.dload_global_timer.start()
self.dload_global_sum=0
self.dload_global_count=0
def add_download_val(self,dload_speed):
# elapsedSec=self.dload_timer.elapsedsec()
# print(f"{elapsedSec} {len(self.dload_vals)} {self.dload_vals}")
if len(self.dload_vals)>=7:
del self.dload_vals[0]
self.dload_vals+=[dload_speed]
def add_download_global_val(self,dload_speed):
if self.dload_global_count>=100:
self.dload_global_sum=0
self.dload_global_count=0
self.dload_global_sum+=dload_speed
self.dload_global_count+=1
def avg_download_speed(self):
# elapsedSec=self.dload_timer.elapsedsec()
avg=sum(self.dload_vals)
avg=avg/len(self.dload_vals)
# print(f"sum {avg}")
# avg=avg/elapsedSec
# if elapsedSec>=12:
# self.start_download_timer()
return avg
def avg_download_global_speed(self):
# elapsedSec=self.dload_global_timer.elapsedsec()
# avg=self.dload_global_sum/elapsedSec
avg=self.dload_global_sum/self.dload_global_count
return avg
class MetrStat():
def __init__(self):
self.stats={}
self.count=0
self.reset()
self.reset_freq=35
def reset(self):
self.stats={"qpsk":0,"16qam":0,"64qam":0,"256qam":0,"other":0}
self.count=0
def add_val(self,stat_name,incr_val):
if self.count>self.reset_freq:
self.reset()
self.stats[stat_name]+=incr_val
self.count+=1
def best_modulation(self):
best_stat=max(self.stats, key=self.stats.get)
max_val=self.stats[best_stat]
sum_all=self.stats["qpsk"]+self.stats["16qam"]+self.stats["64qam"]+self.stats["256qam"]+self.stats["other"]
percent=(max_val/sum_all)*100
return best_stat,max_val,percent
def router_info(kc :KeeneticClient,speedmeter :AvgMeter,stats :MetrStat):
# def router_info(cfg):
# res=kc.do_get(f"/rci/show/interface",{}) #,{"":""})
# ipp=res["UsbLte0"]
# ## print(res)
# print(ipp)
# modem_ret=res["parse"]["status"]
# print(res)
# print(modem_ret)
# print(datetime.now())
ireq={}
ireq["auth"]=True
ireq["tty_error"]=""
ireq["modem_error"]=""
ireq["errmsg"]=""
res=kc.do_get(f"/rci/show/interface",{}) #,{"":""})
ipp=res["UsbLte0"]
if ipp["link"]=="down" or ipp["connected"]=="no":
ireq["modem_error"]="error"
ireq["errmsg"]=f"link {ipp["link"]} ,connected {ipp["connected"]}"
raise KeeneticApiException("901", ireq["errmsg"])
#interface UsbLte0 tty send AT+GTCCINFO?
res=kc.do_post(f"/rci/",{"parse": "interface UsbLte0 tty send AT+GTCAINFO?"})
if res["parse"]["status"][0]["status"]=="error":
msg=res["parse"]["status"][0]["message"]
ireq["tty_error"]="error"
ireq["errmsg"]=msg
raise KeeneticApiException("910", ireq["errmsg"])
ca_res=res["parse"]["tty-out"]
ca_data,ca_has_ok=tty_parse_lines(ca_res)
# print(ca_res)
# print(ca_data)
"""
PCC: <band>, <physical cellId>, <earfcn>, <dl_bandwidth>, ? , <dl_mimo>, <ul_mimo>, <dl_modulation>, <ul_modulation>, <rsrp>
"PCC: 103 , 489 , 1275 , 75 , 75 , 1 , 1 , 2 , 3 , -64"
(10 vals) "PCC:103,489,1275,75,75,1,1,2,3,-64"
"PCC:103,489,1275,75,75,1,1,2,1,-64",
"PCC:103,489,1275,75,75,1,1,2,3,-64",
"PCC:103,489,1275,75,75,1,1,2,3,-64",
"PCC:107,427,2850,100,100,2,1,3,3,-81",
SCC1: <scell_state>, <ul_configured>, <band>, <physical cellId>, <earfcn>, <dl_bandwidth>, <ul_bandwidth>, <dl_mimo>, <ul_mimo>, <dl_modulation>, <ul_modulation>, <rsrp>
(12 vals)
"SCC 1:2 ,1 ,107 ,427 ,3048 ,100 ,100 ,2 ,1 ,3 ,3 ,-81",
"SCC 1:2,1,107,427,3048,100,100,1,1,3,1,-80",
"SCC 1:2,1,107,427,3048,100,100,2,1,3,3,-81",
"""
#interface UsbLte0 tty send AT+CESQ
res=kc.do_post(f"/rci/",{"parse": "interface UsbLte0 tty send AT+CSQ"})
if res["parse"]["status"][0]["status"]=="error":
msg=res["parse"]["status"][0]["message"]
ireq["tty_error"]="error"
ireq["errmsg"]=msg
raise KeeneticApiException("910", ireq["errmsg"])
csq_res=res["parse"]["tty-out"]
csq_data,csq_has_ok=tty_parse_lines(csq_res)
# print(csq_res)
# print(csq_data)
# ## print(res)
# print(ipp)
res=kc.do_post(f"/rci/",{"parse": "show interface UsbLte0 stat"})
speed_res=res["parse"]
rxbytes=speed_res["rxbytes"]
txbytes=speed_res["txbytes"]
rxspeed=speed_res["rxspeed"]
txspeed=speed_res["txspeed"]
# print(speed_res)
# print(rxspeed)
# print(txspeed)
ibs={}
ibs["datetime"]=datetime.now()
ibs["datetext"]=f"{datetime.now()}"
# 'sim': 'READY',
# 'operator-raw': '0,0,"MegaFon pre-5G",7',
# 'connection-state': 'Connected',
# 'state': 'up',
if ipp["connected"]=="yes":
ibs["mstate"]="connected"
else:
ibs["mstate"]="no connection"
ibs["mlink"]=ipp["link"]
ibs["t"]=ipp['temperature']
ibs["enbid"]=ipp['enb-id']
ibs["cell"]=ipp['bssid']
ibs["pci"]=ipp['phy-cell-id']
ibs["sector"]=ipp['sector-id']
speedmeter.set_rxbytes(rxbytes)
avg_dspeed_bytes=speedmeter.get_rx_speed_bytes()
speedmeter.add_download_val(rxspeed/1000000)
avg_dspeed=speedmeter.avg_download_speed()
speedmeter.add_download_global_val(rxspeed/1000000)
avg_dspeed_global=speedmeter.avg_download_global_speed()
ispeed={}
# ispeed["down"]=rxbytes #rxspeed
# ispeed["down"]=rxspeed
ispeed["down_quick"]=(avg_dspeed_bytes*8.0)/1000000.0 #avg_dspeed_bytes/(1000.0) # (avg_dspeed_bytes*8.0)/1000.0
ispeed["down"]=rxspeed/1000000
ispeed["up"]=txspeed/1000000
ispeed["avgtotal"]=avg_dspeed_global
ispeed["avglast"]=avg_dspeed
ispeed["ping"]="10"
imetric={}
imetric["rsrq"]=ipp['rsrq']
imetric["rsrp"]=ipp['rsrp']
imetric["rssi"]=ipp['rssi']
imetric["sinr"]=ipp['cinr']
imetric["csq"]=csq_data[0]["first_val"]
imetric["signal-level"]=ipp["signal-level"]
icarrier={}
bands=[]
icarrier["bands"]=bands
for ca in ca_data:
if ca["cmd"]=="+GTCAINFO" or ca["src"]=="OK":
continue
offset=0
if ca["cmd"]=="PCC":
offset=0
bandnum=ca["first_val"]
else:
offset=2
bandnum=ca["vals"][offset]
bandnum=str(int(bandnum)-100)
band={}
band["name"]="Band "+bandnum
band["num"]=bandnum
band["pci"]=ca["vals"][1+offset]
modulation=ca["vals"][7+offset]
band["modulation"]="" #"QAM64"
if modulation=="0":
band["modulation"]="BPSK"
stats.add_val("other",1)
elif modulation=="1":
band["modulation"]="QPSK"
stats.add_val("qpsk",1)
elif modulation=="2":
band["modulation"]="16QAM"
stats.add_val("16qam",1)
elif modulation=="3":
band["modulation"]="64QAM"
stats.add_val("64qam",1)
elif modulation=="4":
band["modulation"]="256QAM"
stats.add_val("256qam",1)
else:
band["modulation"]="U?"
stats.add_val("other",1)
best_stat,max_val,percent=stats.best_modulation()
band["calc_modul_best"]=best_stat
band["calc_modul_percent"]=percent
mimo=ca["vals"][5+offset]
band["mimo"]=""
if mimo=="1":
band["mimo"]="1x1"
elif mimo=="2":
band["mimo"]="2x2"
elif mimo=="3":
band["mimo"]="4x4"
band["rsrp"]=ca["vals"][9+offset]
bands+=[band]
return ireq,ibs,ispeed,imetric,icarrier
def update_ui(live_data :Live,layout :Layout,ireq,ibs,ispeed,imetric,icarrier):
# table = Table(box=None) #(box=box.SQUARE,show_header=False) # box=None) #box.SQUARE)
table =Table.grid() #(expand=True)
table.add_column("1",width=40, justify="center")
table.add_column("2",width=30)
table.add_column("3",width=40)
# dt=f"{ibs["datetime"].second}" #.now().second}"
table.add_row(f'Cell = [bold yellow]{ibs["enbid"]} : {ibs["cell"]}',f'PCI/sector = {ibs["pci"]} : {ibs["sector"]}')
state_data=f'[bold cyan] {ibs["mstate"].upper()}'
if ibs["mstate"]!="connected":
state_data=f'[bold red] {ibs["mstate"].upper()}'
table.add_row(state_data,f'T = [bold]{ibs["t"]}C', ibs["datetext"])
layout["row1"].update(table)
# layout["row1"].update(Panel(table))
table = Table(box=None)
table.add_column("Download",width=27, justify="center")
table.add_column("Avg (last100/last10)",width=38, justify="center")
table.add_column("Upload",width=15, justify="center")
# table.add_row(end_section=True)
# dspeed=ispeed["down"]/1000000
# uspeed=ispeed["up"]/1000000
dspeed_quick=ispeed["down_quick"]
dspeed=ispeed["down"]
uspeed=ispeed["up"]
table.add_row(f"[bold bright_green]{dspeed:.2f} (quick: {dspeed_quick:.2f}) mbit",f"[bold cyan]{ispeed["avgtotal"]:.2f} / {ispeed["avglast"]:.2f} mbit",f"[bold orange1]{uspeed:.2f} mbit")
layout["row2"].update(Panel(table,box=box.ROUNDED,border_style="grey42"))
table = Table(box=box.SQUARE)
style_csq="bold red"
style_rsrq="bold"
style_rsrp="bold yellow"
style_rssi="bold bright_cyan"
style_sinr="bold green"
table.add_column("",width=10, justify="center")
table.add_column("",width=10, justify="center")
table.add_column("Signal-level",width=10, justify="center")
table.add_column("CSQ",width=10, justify="center",header_style=style_csq.replace("bold",""))
table.add_column("RSRQ",width=10, justify="center",header_style=style_rsrq.replace("bold",""))
table.add_column("RSRP",width=10, justify="center",header_style=style_rsrp.replace("bold",""))
table.add_column("RSSI",width=10, justify="center",header_style=style_rssi.replace("bold",""))
table.add_column("SINR",width=10, justify="center",header_style=style_sinr.replace("bold",""))
table.add_row("","",f"{imetric["signal-level"]}",f"[{style_csq}]{imetric["csq"]}",f"[{style_rsrq}]{imetric["rsrq"]}",f"[{style_rsrp}]{imetric["rsrp"]}",f"[{style_rssi}]{imetric["rssi"]}",f"[{style_sinr}]{imetric["sinr"]}")
layout["row3"].update(Panel(table,title="METRIC",title_align="left",box=box.ROUNDED,border_style="grey30"))
table = Table(box=box.SQUARE,show_lines=True)
table.add_column("Band",width=10, justify="center")
table.add_column("PCI",width=10, justify="center")
table.add_column("Modulation",width=22, justify="center")
table.add_column("Mimo",width=10, justify="center")
table.add_column("RSRP",width=10, justify="center",header_style=style_rsrp.replace("bold",""))
for ca in icarrier["bands"]:
# ca=icarrier["bands"][0]
modl=ca["modulation"]
modl_style=""
if modl=="QPSK":
modl_style="default"
elif modl=="16QAM":
modl_style="cyan"
elif modl=="64QAM":
modl_style="bold green"
elif modl=="256QAM":
modl_style="bold magenta"
else:
modl_style="bold yellow"
mimo=ca["mimo"]
mimo_style=""
if mimo=="1x1":
mimo_style="default"
elif mimo=="2x2":
mimo_style="bold green"
elif mimo=="4x4":
mimo_style="bold magenta"
else:
mimo_style="bold yellow"
table.add_row(f"[bold bright_cyan]{ca["num"]}",f"{ca["pci"]}",f"[{modl_style}]{modl} ({ca['calc_modul_best']} {int(ca['calc_modul_percent'])}%)",f"[{mimo_style}]{ca["mimo"]}",f"[{style_rsrp}]{ca["rsrp"]}")
layout["row4"].update(Panel(table,title="C A R R I E R I N F O",title_align="left",box=box.ROUNDED,border_style="grey30"))
live_data.update(layout)
def runmain():
# pwd = os.path.dirname(os.path.realpath(__file__))
# cfg = json.load(open(pwd + "mypass.json", "r"))
# logging.info("Connecting to router: " + keenetic_config['admin_endpoint'])
pwd = os.path.dirname(os.path.realpath(__file__))
data=file_load(pwd +"\\mypass.json")
cfg =json.loads(data)
conn_info = cfg['info']
admin_endpoint="""http://192.168.1.1:80"""
login="admin"
password=conn_info['password']
# http://192.168.1.1/rci/show/interface
connect_tested=False
speedmeter=AvgMeter()
stats=MetrStat()
layout = Layout()
layout.split_column(
Layout(name="row1",size=2),
Layout(name="row2",size=4),
Layout(name="row3",size=7),
Layout(name="row4"), )
state_code=0
while True:
# if state_code==503:
if state_code==-1:
break
try:
speedmeter.start_download_timer()
speedmeter.start_download_global_timer()
with KeeneticClient(admin_endpoint, False,login, password) as kc:
if not connect_tested :
kc.connect()
connect_tested=True
# print("connected")
with Live(layout) as live_data:
for i in range(9666333):
# for i in range(5):
ireq,ibs,ispeed,imetric,icarrier=router_info(kc,speedmeter,stats)
update_ui(live_data,layout,ireq,ibs,ispeed,imetric,icarrier)
# print(ibs)
# print(ispeed)
# print(imetric)
# print(icarrier)
time.sleep(0.25)
except ConnectionError as e:
print(f"connection error: {e}")
print("restarting...")
state_code=404
except KeeneticApiException as e:
print(f"my keenetic error: {e}")
if e.status_code==503:
print("503 - restarting...")
else:
print(f"{e.status_code} - restarting...")
state_code=901
except Exception as e:
print(f"unknown error: {e}")
state_code=10
finally:
pass
time.sleep(2)
if __name__ == '__main__':
runmain()
try:
pass
finally:
pass