软件设计讲解

软件框架

软件设计图

代码讲解

等待网络就绪

main.py的wait_network_ready类是计算最大等待次数(WAIT_NETWORK_READY_S秒除以5秒间隔。如果网络就绪则返回True,超时则返回False。

def wait_network_ready():
    wait_cnt = WAIT_NETWORK_READY_S / 5
    is_ready = False

    while wait_cnt:        
        lte = dataCall.getInfo(1, 0)
        if lte[2][0] == 1:
            is_ready = True                        
            break

        utime.sleep(5)
        wait_cnt -= 1    

    return is_ready

创建应用实例

create_app创建应用实例的函数。主要功能包括:创建Application应用对象,设置名称和版本初始化应用配置,加载指定路径的配置文件初始化各种服务模块(QTH客户端、GNSS服务、电池服务等),返回配置好的应用实例。

def create_app(name="SimpliKit", version="1.0.0", config_path="/usr/config.json"):
    _app = Application(name, version)
    _app.config.init(config_path)

    qth_client.init_app(_app)
    #lbs_service.init_app(_app)
    gnss_service.init_app(_app) 
    battery_service.init_app(_app)
    #sensor_service.init_app(_app)

    return _app

GNSS

这是一个用于处理GNSS(全球导航卫星系统)定位数据的Python服务模块,主要用于物联网设备的定位追踪。

NMEA数据解析类 - NmeaDict

NMEA是GPS设备使用的标准数据格式:

  • load() 方法:解析原始NMEA数据,进行CRC校验
  • checksum()方法:验证NMEA语句的完整性
class NmeaDict(dict):

    @classmethod
    def load(cls, raw):
        items = {}
        for line in raw.split('\r\n'):
            try:
                tail_index = line.rfind('*')
                if tail_index == -1:
                    continue
                head_index = line.rfind('$', 0, tail_index)
                if head_index == -1:
                continue
                crc = int(line[tail_index + 1:tail_index + 3], 16)
                if cls.checksum(line[head_index + 1:tail_index]) != crc:
                    raise ValueError('CRC check failed')
                cmdlist = line[head_index:tail_index].split(',')
                # print(line[head_index:])
                if cmdlist[0] not in items:
                    items[cmdlist[0]] = []
                items[cmdlist[0]].append(line)
            except Exception as e:
                # logger.debug('parse nmea line error: {}; pass it: {}'.format(e, line))
                continue
        return cls(items)

    @staticmethod
    def checksum(data):
        crc = ord(data[0])
        for one in (ord(_) for _ in data[1:]):
            crc ^= one
        return crc

核心服务类 - GnssService

初始化和配置

  • 管理GNSS模块的初始化和状态控制

  • 与应用程序集成 (init_app)

  • 设置定位更新间隔 (update_interval)

    class GnssService(object):
    
        def __init__(self, app=None):
            self.interval = 300
            self.__gnss = quecgnss
    
            if app is not None:
                self.init_app(app)
    
        def __str__(self):
            return '{}'.format(type(self).__name__)
        def init_app(self, app):
            self.event = app.event
            self.gnss_sleep_event = app.gnss_sleep_event
            self.interval = app.config["SLEEP_INTERVAL_SECONDS"]
            app.register('gnss_service', self)
        def load(self):
            logger.info('loading {} extension, init quecgnss will take some seconds'.format(self))
            result = self.init()
            logger.info('{} init gnss res: {}'.format(self, result))
            if result:
                Thread(target=self.start_update).start()
    
        def init(self):
            if self.__gnss.init() != 0:
                logger.warn('{} gnss init FAILED'.format(self))
                return False
            return True
        def status(self):
            # 0	int	GNSS模块处于关闭状态
            # 1	int	GNSS模块固件升级中
            # 2	int GNSS模块定位中,这种模式下即可开始读取GNSS定位数据,定位数据是否有效需要用户获取到定位数据后,解析对应语句来判断,比如判断GNRMC语句的status是 A 还是 V,A 表示定位有效,V表示定位无效。
            return self.__gnss.get_state()
    
        def enable(self, flag=True):
            return self.__gnss.gnssEnable(bool(flag)) == 0
        def read(self, size=4096):
            raw = self.__gnss.read(size)
            if raw != -1:
                size, data = raw
                # KHK
                #logger.debug('gnss read raw {} bytes data:\n{}'.format(size, data))
                return NmeaDict.load(data)
        def check_gnss_signal(self, nmea_dict):
    
            snr_threshold = 15        
            min_sats = 3
            has_3d_fix = False
            if "$GNGSA" in nmea_dict:
                for line in nmea_dict["$GNGSA"]:
                    parts = line.split(",")
                    if len(parts) > 2 and (parts[2] == "3" or parts[2] == "2"):
                        has_3d_fix = True
                        break
            if not has_3d_fix:
                return False
    
            snrs = []
    
            def extract_snrs(lines):
                for line in lines:
                    parts = line.split(",")
                    i = 4
                    while i + 3 < len(parts):
                        snr_str = parts[i + 3]
                        if snr_str.isdigit():
                            snrs.append(int(snr_str))
                        i += 4
            if "$GPGSV" in nmea_dict:
                extract_snrs(nmea_dict["$GPGSV"])
    
            if "$GBGSV" in nmea_dict:
                extract_snrs(nmea_dict["$GBGSV"])
    
            if "$GAGSV" in nmea_dict:
                extract_snrs(nmea_dict["$GAGSV"])
    
            # count satelites with SNR > 15
            count = 0
            for snr in snrs:
                if snr > snr_threshold:
                    count += 1
                    if count >= min_sats:
                        return True
            return False
    
        def update_interval(self,interval):  
            self.interval = interval
    

QTH平台客户端

QthClient 类是一个用于与 QTH(Quantum Technology Hub)平台进行通信的客户端类。它负责初始化、启动和管理与 QTH 平台的连接,并处理来自平台的各种事件和回调。

logger = getLogger(__name__)


class QthClient(object):

    def __init__(self, app=None):
        self.opt_lock = Lock()
        if app:
            self.init_app(app)
 def __enter__(self):
        self.opt_lock.acquire()
        return self

    def __exit__(self, *args, **kwargs):
        self.opt_lock.release()

    def init_app(self, app):
        app.register("qth_client", self)
        Qth.init()               
        Qth.setProductInfo(app.config["QTH_PRODUCT_KEY"], app.config["QTH_PRODUCT_SECRET"])
        Qth.setServer(app.config["QTH_SERVER"])
        Qth.setEventCb(
            {
                "devEvent": self.eventCallback, 
                "recvTrans": self.recvTransCallback, 
                "recvTsl": self.recvTslCallback, 
                "readTsl": self.readTslCallback, 
                "readTslServer": self.recvTslServerCallback,
                "ota": {
                    "otaPlan":self.otaPlanCallback,
                    "fotaResult":self.fotaResultCallback
                }
            }
        )

def load(self):
        self.start()

    def start(self):
        Qth.start()
        while not self.isStatusOk():
            utime.sleep(3)

    def stop(self):
        Qth.stop()

    def sendTsl(self, mode, value):
        return Qth.sendTsl(mode, value)

    def isStatusOk(self):
        return Qth.state()

    def sendLbs(self, lbs_data):
        return Qth.sendOutsideLocation(lbs_data)

    def sendGnss(self, nmea_data):
        return Qth.sendOutsideLocation(nmea_data)

    def eventCallback(self, event, result):
        logger.info("dev event:{} result:{}".format(event, result))
        if(2== event and 0 == result):
            Qth.otaRequest()
def recvTransCallback(self, value):
        ret =Qth.sendTrans(1, value)
        logger.info("recvTrans value:{} ret:{}".format(value, ret))

    def recvTslCallback(self, value):
        logger.info("recvTsl:{}".format(value))
        for cmdId, val in value.items():
            logger.info("recvTsl {}:{}".format(cmdId, val))

            if cmdId == 8:
                CurrentApp().gnss_service.update_interval(val)
                CurrentApp().lbs_service.update_interval(val)

    def readTslCallback(self, ids, pkgId):
        logger.info("readTsl ids:{} pkgId:{}".format(ids, pkgId))
        value=dict()
         temp1, humi =CurrentApp().sensor_service.get_temp1_and_humi()
        press, temp2 = CurrentApp().sensor_service.get_press_and_temp2()
        r,g,b = CurrentApp().sensor_service.get_rgb888()


        for id in ids:
            if 3 == id:
                value[3]=temp1
            elif 4 == id:
                value[4]=humi
            elif 5 == id:
                value[5]=temp2
            elif 6 == id:
                value[6]=press
            elif 7 == id:
                value[7]={1:r, 2:g, 3:b}
        Qth.ackTsl(1, value, pkgId)


    def recvTslServerCallback(self, serverId, value, pkgId):
        logger.info("recvTslServer serverId:{} value:{} pkgId:{}".format(serverId, value, pkgId))
        Qth.ackTslServer(1, serverId, value, pkgId)
def otaPlanCallback(self, plans):
        logger.info("otaPlan:{}".format(plans))
        Qth.otaAction(1)

    def fotaResultCallback(self, comp_no, result):
        logger.info("fotaResult comp_no:{} result:{}".format(comp_no, result))

    def sotaInfoCallback(self, comp_no, version, url, md5, crc):
        logger.info("sotaInfo comp_no:{} version:{} url:{} md5:{} crc:{}".format(comp_no, version, url, md5, crc))
        # 当使用url下载固件完成,且MCU更新完毕后,需要获取MCU最新的版本信息,并通过setMcuVer进行更新
        Qth.setMcuVer("MCU1", "V1.0.0", self.sotaInfoCallback, self.sotaResultCallback)

    def sotaResultCallback(comp_no, result):
        logger.info("sotaResult comp_no:{} result:{}".format(comp_no, result))

电池服务

battery_service.init_app初始化电池服务模块。

logger = getLogger(__name__)

BATTERY_OCV_TABLE = {
    "nix_coy_mnzo2": {
        55: {
            4152: 100, 4083: 95, 4023: 90, 3967: 85, 3915: 80, 3864: 75, 3816: 70, 3773: 65, 3737: 60, 3685: 55,
            3656: 50, 3638: 45, 3625: 40, 3612: 35, 3596: 30, 3564: 25, 3534: 20, 3492: 15, 3457: 10, 3410: 5, 3380: 0,
        },
        20: {
            4143: 100, 4079: 95, 4023: 90, 3972: 85, 3923: 80, 3876: 75, 3831: 70, 3790: 65, 3754: 60, 3720: 55,
            3680: 50, 3652: 45, 3634: 40, 3621: 35, 3608: 30, 3595: 25, 3579: 20, 3548: 15, 3511: 10, 3468: 5, 3430: 0,
        },
        0: {
            4147: 100, 4089: 95, 4038: 90, 3990: 85, 3944: 80, 3899: 75, 3853: 70, 3811: 65, 3774: 60, 3741: 55,
            3708: 50, 3675: 45, 3651: 40, 3633: 35, 3620: 30, 3608: 25, 3597: 20, 3585: 15, 3571: 10, 3550: 5, 3500: 0,
        },
    },
}
class BatteryService(object):
    def __init__(self, app=None):       
        self.interval = 30                
        self.__net = net        
        if app is not None:
            self.init_app(app, battery_ocv="nix_coy_mnzo2")

    def __str__(self):
        return '{}'.format(type(self).__name__)

    def init_app(self, app, battery_ocv="nix_coy_mnzo2"):
        self.gnss_sleep_event = app.gnss_sleep_event
        self.interval = app.config["SLEEP_INTERVAL_SECONDS"]
        self.__energy = 100
        self.__temp = 30
        self.__vbatt_count = 100
        if not BATTERY_OCV_TABLE.get(battery_ocv):
            raise TypeError("Battery OCV %s is not support." % battery_ocv) 
        self.__battery_ocv = battery_ocv
        app.register('battery_service', self)
def load(self):
        logger.info('loading {} extension, init battery will take some seconds'.format(self))
        Thread(target=self.start_update).start()

    def __get_soc_from_dict(self, key, volt_arg):
        """Get battery energy from map"""
        if BATTERY_OCV_TABLE[self.__battery_ocv].get(key):
            volts = sorted(BATTERY_OCV_TABLE[self.__battery_ocv][key].keys(), reverse=True)            
            pre_volt = 0
            volt_not_under = 0  # Determine whether the voltage is lower than the minimum voltage value of soc.
            for volt in volts:
                if volt_arg > volt:
                    volt_not_under = 1
                    soc1 = BATTERY_OCV_TABLE[self.__battery_ocv][key].get(volt, 0)
                    soc2 = BATTERY_OCV_TABLE[self.__battery_ocv][key].get(pre_volt, 0)
                    break
                else:
                pre_volt = volt
            if pre_volt == 0:  # Input Voltarg > Highest Voltarg
                return soc1
            elif volt_not_under == 0:
                return 0
            else:
                return soc2 - (soc2 - soc1) * (pre_volt - volt_arg) // (pre_volt - volt)
def __get_soc(self, temp, volt_arg):
        """Get battery energy by temperature and voltage"""
        if temp > 30:
            return self.__get_soc_from_dict(55, volt_arg)
        elif temp < 10:
            return self.__get_soc_from_dict(0, volt_arg)
        else:
            return self.__get_soc_from_dict(20, volt_arg)            

    def __get_power_vbatt(self):
        """Get vbatt from power"""        
        return int(sum([Power.getVbatt() for i in range(self.__vbatt_count)]) / self.__vbatt_count)
def set_temp(self, temp):
        """Set now temperature."""
        if isinstance(temp, int) or isinstance(temp, float):
            self.__temp = temp
            return True
        return False 
    def start_update(self):
        while True:
            if quecgnss.getPriority():            
                self.__energy = self.__get_soc(self.__temp, self.__get_power_vbatt())            
                data = {4: self.__energy}                
                if data:
                    with CurrentApp().qth_client:
                        for _ in range(3):
                            if CurrentApp().qth_client.sendTsl(1, data):
                                logger.debug("send battery data to qth server success") 
                                break                   
                #self.gnss_sleep_event.set()  # Notify GNSS service to wake up
                utime.sleep(self.interval)
            else:
                utime.sleep(0.1)