软件设计讲解
2025-11-05
软件框架
软件设计图
代码讲解
等待网络就绪
main.py的wait_network_ready类是计算最大等待次数(WAIT_NETWORK_READY_S秒除以5秒间隔。如果网络就绪则返回True,超时则返回False。
def wait_network_ready():
wait_cnt = WAIT_NETWORK_READY_S / 5
is_ready = False
while wait_cnt:
lte = dataCall.getInfo(1, 0)
if lte[2][0] == 1:
is_ready = True
break
utime.sleep(5)
wait_cnt -= 1
return is_ready
创建应用实例
create_app创建应用实例的函数。主要功能包括:创建Application应用对象,设置名称和版本初始化应用配置,加载指定路径的配置文件初始化各种服务模块(QTH客户端、GNSS服务、电池服务等),返回配置好的应用实例。
def create_app(name="SimpliKit", version="1.0.0", config_path="/usr/config.json"):
_app = Application(name, version)
_app.config.init(config_path)
qth_client.init_app(_app)
#lbs_service.init_app(_app)
gnss_service.init_app(_app)
battery_service.init_app(_app)
#sensor_service.init_app(_app)
return _app
GNSS
这是一个用于处理GNSS(全球导航卫星系统)定位数据的Python服务模块,主要用于物联网设备的定位追踪。
NMEA数据解析类 - NmeaDict
NMEA是GPS设备使用的标准数据格式:
- load() 方法:解析原始NMEA数据,进行CRC校验
- checksum()方法:验证NMEA语句的完整性
class NmeaDict(dict):
@classmethod
def load(cls, raw):
items = {}
for line in raw.split('\r\n'):
try:
tail_index = line.rfind('*')
if tail_index == -1:
continue
head_index = line.rfind('$', 0, tail_index)
if head_index == -1:
continue
crc = int(line[tail_index + 1:tail_index + 3], 16)
if cls.checksum(line[head_index + 1:tail_index]) != crc:
raise ValueError('CRC check failed')
cmdlist = line[head_index:tail_index].split(',')
# print(line[head_index:])
if cmdlist[0] not in items:
items[cmdlist[0]] = []
items[cmdlist[0]].append(line)
except Exception as e:
# logger.debug('parse nmea line error: {}; pass it: {}'.format(e, line))
continue
return cls(items)
@staticmethod
def checksum(data):
crc = ord(data[0])
for one in (ord(_) for _ in data[1:]):
crc ^= one
return crc
核心服务类 - GnssService
初始化和配置
管理GNSS模块的初始化和状态控制
与应用程序集成 (init_app)
设置定位更新间隔 (update_interval)
class GnssService(object): def __init__(self, app=None): self.interval = 300 self.__gnss = quecgnss if app is not None: self.init_app(app) def __str__(self): return '{}'.format(type(self).__name__) def init_app(self, app): self.event = app.event self.gnss_sleep_event = app.gnss_sleep_event self.interval = app.config["SLEEP_INTERVAL_SECONDS"] app.register('gnss_service', self) def load(self): logger.info('loading {} extension, init quecgnss will take some seconds'.format(self)) result = self.init() logger.info('{} init gnss res: {}'.format(self, result)) if result: Thread(target=self.start_update).start() def init(self): if self.__gnss.init() != 0: logger.warn('{} gnss init FAILED'.format(self)) return False return True def status(self): # 0 int GNSS模块处于关闭状态 # 1 int GNSS模块固件升级中 # 2 int GNSS模块定位中,这种模式下即可开始读取GNSS定位数据,定位数据是否有效需要用户获取到定位数据后,解析对应语句来判断,比如判断GNRMC语句的status是 A 还是 V,A 表示定位有效,V表示定位无效。 return self.__gnss.get_state() def enable(self, flag=True): return self.__gnss.gnssEnable(bool(flag)) == 0 def read(self, size=4096): raw = self.__gnss.read(size) if raw != -1: size, data = raw # KHK #logger.debug('gnss read raw {} bytes data:\n{}'.format(size, data)) return NmeaDict.load(data) def check_gnss_signal(self, nmea_dict): snr_threshold = 15 min_sats = 3 has_3d_fix = False if "$GNGSA" in nmea_dict: for line in nmea_dict["$GNGSA"]: parts = line.split(",") if len(parts) > 2 and (parts[2] == "3" or parts[2] == "2"): has_3d_fix = True break if not has_3d_fix: return False snrs = [] def extract_snrs(lines): for line in lines: parts = line.split(",") i = 4 while i + 3 < len(parts): snr_str = parts[i + 3] if snr_str.isdigit(): snrs.append(int(snr_str)) i += 4 if "$GPGSV" in nmea_dict: extract_snrs(nmea_dict["$GPGSV"]) if "$GBGSV" in nmea_dict: extract_snrs(nmea_dict["$GBGSV"]) if "$GAGSV" in nmea_dict: extract_snrs(nmea_dict["$GAGSV"]) # count satelites with SNR > 15 count = 0 for snr in snrs: if snr > snr_threshold: count += 1 if count >= min_sats: return True return False def update_interval(self,interval): self.interval = interval
QTH平台客户端
QthClient 类是一个用于与 QTH(Quantum Technology Hub)平台进行通信的客户端类。它负责初始化、启动和管理与 QTH 平台的连接,并处理来自平台的各种事件和回调。
logger = getLogger(__name__)
class QthClient(object):
def __init__(self, app=None):
self.opt_lock = Lock()
if app:
self.init_app(app)
def __enter__(self):
self.opt_lock.acquire()
return self
def __exit__(self, *args, **kwargs):
self.opt_lock.release()
def init_app(self, app):
app.register("qth_client", self)
Qth.init()
Qth.setProductInfo(app.config["QTH_PRODUCT_KEY"], app.config["QTH_PRODUCT_SECRET"])
Qth.setServer(app.config["QTH_SERVER"])
Qth.setEventCb(
{
"devEvent": self.eventCallback,
"recvTrans": self.recvTransCallback,
"recvTsl": self.recvTslCallback,
"readTsl": self.readTslCallback,
"readTslServer": self.recvTslServerCallback,
"ota": {
"otaPlan":self.otaPlanCallback,
"fotaResult":self.fotaResultCallback
}
}
)
def load(self):
self.start()
def start(self):
Qth.start()
while not self.isStatusOk():
utime.sleep(3)
def stop(self):
Qth.stop()
def sendTsl(self, mode, value):
return Qth.sendTsl(mode, value)
def isStatusOk(self):
return Qth.state()
def sendLbs(self, lbs_data):
return Qth.sendOutsideLocation(lbs_data)
def sendGnss(self, nmea_data):
return Qth.sendOutsideLocation(nmea_data)
def eventCallback(self, event, result):
logger.info("dev event:{} result:{}".format(event, result))
if(2== event and 0 == result):
Qth.otaRequest()
def recvTransCallback(self, value):
ret =Qth.sendTrans(1, value)
logger.info("recvTrans value:{} ret:{}".format(value, ret))
def recvTslCallback(self, value):
logger.info("recvTsl:{}".format(value))
for cmdId, val in value.items():
logger.info("recvTsl {}:{}".format(cmdId, val))
if cmdId == 8:
CurrentApp().gnss_service.update_interval(val)
CurrentApp().lbs_service.update_interval(val)
def readTslCallback(self, ids, pkgId):
logger.info("readTsl ids:{} pkgId:{}".format(ids, pkgId))
value=dict()
temp1, humi =CurrentApp().sensor_service.get_temp1_and_humi()
press, temp2 = CurrentApp().sensor_service.get_press_and_temp2()
r,g,b = CurrentApp().sensor_service.get_rgb888()
for id in ids:
if 3 == id:
value[3]=temp1
elif 4 == id:
value[4]=humi
elif 5 == id:
value[5]=temp2
elif 6 == id:
value[6]=press
elif 7 == id:
value[7]={1:r, 2:g, 3:b}
Qth.ackTsl(1, value, pkgId)
def recvTslServerCallback(self, serverId, value, pkgId):
logger.info("recvTslServer serverId:{} value:{} pkgId:{}".format(serverId, value, pkgId))
Qth.ackTslServer(1, serverId, value, pkgId)
def otaPlanCallback(self, plans):
logger.info("otaPlan:{}".format(plans))
Qth.otaAction(1)
def fotaResultCallback(self, comp_no, result):
logger.info("fotaResult comp_no:{} result:{}".format(comp_no, result))
def sotaInfoCallback(self, comp_no, version, url, md5, crc):
logger.info("sotaInfo comp_no:{} version:{} url:{} md5:{} crc:{}".format(comp_no, version, url, md5, crc))
# 当使用url下载固件完成,且MCU更新完毕后,需要获取MCU最新的版本信息,并通过setMcuVer进行更新
Qth.setMcuVer("MCU1", "V1.0.0", self.sotaInfoCallback, self.sotaResultCallback)
def sotaResultCallback(comp_no, result):
logger.info("sotaResult comp_no:{} result:{}".format(comp_no, result))
电池服务
battery_service.init_app初始化电池服务模块。
logger = getLogger(__name__)
BATTERY_OCV_TABLE = {
"nix_coy_mnzo2": {
55: {
4152: 100, 4083: 95, 4023: 90, 3967: 85, 3915: 80, 3864: 75, 3816: 70, 3773: 65, 3737: 60, 3685: 55,
3656: 50, 3638: 45, 3625: 40, 3612: 35, 3596: 30, 3564: 25, 3534: 20, 3492: 15, 3457: 10, 3410: 5, 3380: 0,
},
20: {
4143: 100, 4079: 95, 4023: 90, 3972: 85, 3923: 80, 3876: 75, 3831: 70, 3790: 65, 3754: 60, 3720: 55,
3680: 50, 3652: 45, 3634: 40, 3621: 35, 3608: 30, 3595: 25, 3579: 20, 3548: 15, 3511: 10, 3468: 5, 3430: 0,
},
0: {
4147: 100, 4089: 95, 4038: 90, 3990: 85, 3944: 80, 3899: 75, 3853: 70, 3811: 65, 3774: 60, 3741: 55,
3708: 50, 3675: 45, 3651: 40, 3633: 35, 3620: 30, 3608: 25, 3597: 20, 3585: 15, 3571: 10, 3550: 5, 3500: 0,
},
},
}
class BatteryService(object):
def __init__(self, app=None):
self.interval = 30
self.__net = net
if app is not None:
self.init_app(app, battery_ocv="nix_coy_mnzo2")
def __str__(self):
return '{}'.format(type(self).__name__)
def init_app(self, app, battery_ocv="nix_coy_mnzo2"):
self.gnss_sleep_event = app.gnss_sleep_event
self.interval = app.config["SLEEP_INTERVAL_SECONDS"]
self.__energy = 100
self.__temp = 30
self.__vbatt_count = 100
if not BATTERY_OCV_TABLE.get(battery_ocv):
raise TypeError("Battery OCV %s is not support." % battery_ocv)
self.__battery_ocv = battery_ocv
app.register('battery_service', self)
def load(self):
logger.info('loading {} extension, init battery will take some seconds'.format(self))
Thread(target=self.start_update).start()
def __get_soc_from_dict(self, key, volt_arg):
"""Get battery energy from map"""
if BATTERY_OCV_TABLE[self.__battery_ocv].get(key):
volts = sorted(BATTERY_OCV_TABLE[self.__battery_ocv][key].keys(), reverse=True)
pre_volt = 0
volt_not_under = 0 # Determine whether the voltage is lower than the minimum voltage value of soc.
for volt in volts:
if volt_arg > volt:
volt_not_under = 1
soc1 = BATTERY_OCV_TABLE[self.__battery_ocv][key].get(volt, 0)
soc2 = BATTERY_OCV_TABLE[self.__battery_ocv][key].get(pre_volt, 0)
break
else:
pre_volt = volt
if pre_volt == 0: # Input Voltarg > Highest Voltarg
return soc1
elif volt_not_under == 0:
return 0
else:
return soc2 - (soc2 - soc1) * (pre_volt - volt_arg) // (pre_volt - volt)
def __get_soc(self, temp, volt_arg):
"""Get battery energy by temperature and voltage"""
if temp > 30:
return self.__get_soc_from_dict(55, volt_arg)
elif temp < 10:
return self.__get_soc_from_dict(0, volt_arg)
else:
return self.__get_soc_from_dict(20, volt_arg)
def __get_power_vbatt(self):
"""Get vbatt from power"""
return int(sum([Power.getVbatt() for i in range(self.__vbatt_count)]) / self.__vbatt_count)
def set_temp(self, temp):
"""Set now temperature."""
if isinstance(temp, int) or isinstance(temp, float):
self.__temp = temp
return True
return False
def start_update(self):
while True:
if quecgnss.getPriority():
self.__energy = self.__get_soc(self.__temp, self.__get_power_vbatt())
data = {4: self.__energy}
if data:
with CurrentApp().qth_client:
for _ in range(3):
if CurrentApp().qth_client.sendTsl(1, data):
logger.debug("send battery data to qth server success")
break
#self.gnss_sleep_event.set() # Notify GNSS service to wake up
utime.sleep(self.interval)
else:
utime.sleep(0.1)