# -*- coding:utf-8 -*- """ @Author : xuxingchen @Contact : xuxingchen@sinochem.com @Desc : None """ import json import traceback import time from config import DB_PATH from device.message import ErrorResp, DRRespItem, BaseResp, SDRRespItem, SimpleResp from models.devices import DevicesTable, DeviceRegister from models.products import ProductsTable from utils.misc import generate_captcha_text, now_datetime_second from utils import logger from utils.database import SQLiteDatabaseEngine, BaseTable from utils.misc import UserData def on_message(_client, userdata: UserData, message): try: logger.Logger.debug("💡 接收到新数据,执行 on_message 中 ...", log_path=None) msg = json.loads(message.payload.decode().replace("\x00", "")) logger.Logger.info(f"{message.topic}: {msg}") userdata.set_topic(message.topic) if userdata.table_handler is None: _db = SQLiteDatabaseEngine(db_path=DB_PATH) userdata.set_table_handler(BaseTable(_db.connection, _db.cursor)) if userdata.token: if "messageId" in msg.keys() and userdata.token == msg["messageId"]: # 兼容aiot平台 userdata.set_status_add("status", True) userdata.set_status_add("response", msg) else: auto_service(msg, userdata) except Exception as e: logger.Logger.error(f"{type(e).__name__}, {e}") if logger.DEBUG: traceback.print_exc() class BaseService: def handle(self, **kwargs): """must be `override`""" pass class Services: """业务逻辑入口""" class DeviceRegisterService(BaseService): """直连设备、网关设备注册""" def handle(self, msg: dict, userdata: UserData): logger.Logger.debug("设备注册请求已接收,正在执行注册 ...", log_path=None) client = userdata.clients["center"][0] topic = f"{userdata.topic}_resp" userdata.set_topic(topic) message_id = msg["messageId"] if "messageId" in msg.keys() else "1" try: # 获取产品信息 params = msg["params"] product = ProductsTable.get_product_info(userdata.table_handler, params["productId"]) if product is None: logger.Logger.error("DeviceRegisterService -> 产品ID不存在") message = ErrorResp(message_id, 601) userdata.set_message(message.dict()) client.publish(topic, message()) return if product.node_type not in ["网关设备", "直连设备"]: logger.Logger.error("DeviceRegisterService -> 节点类型错误") message = ErrorResp(message_id, 503) userdata.set_message(message.dict()) client.publish(topic, message()) return sct = generate_captcha_text(32) dr = DeviceRegister( device_mac=params["deviceName"], device_name=params["displayName"] if params["displayName"] else "", device_desc=params["desc"] if params["desc"] else "", third_local_device_id=params["thirdLocalDeviceId"] if params["thirdLocalDeviceId"] else "", device_sct=sct, gateway_id=None, gateway_name=None, product_name=product.product_name, node_type=product.node_type ) device_id = DevicesTable.insert_by_device_register(userdata.table_handler, dr) resp = BaseResp(message_id, DRRespItem( str(device_id), params["displayName"], sct, 0 )) userdata.set_message(resp.dict()) client.publish(topic, resp()) except (KeyError, TypeError): logger.Logger.error("DeviceRegisterService -> 关键参数缺失") message = ErrorResp(message_id, 401) userdata.set_message(message.dict()) client.publish(topic, message()) class SubDeviceRegisterService(BaseService): """子设备注册""" def handle(self, msg: dict, userdata: UserData): logger.Logger.debug("子设备注册请求已接收,正在执行注册 ...", log_path=None) client = userdata.clients["center"][0] topic = f"{userdata.topic}_resp" userdata.set_topic(topic) message_id = msg["messageId"] if "messageId" in msg.keys() else "1" try: gateway_id = int(userdata.topic.split("/")[2]) gateway_info = DevicesTable.get_device_info(userdata.table_handler, gateway_id) if gateway_info is None: # 确保当前主题中的设备ID存在 logger.Logger.error("SubDeviceRegisterService -> 并无对应上级设备") message = ErrorResp(message_id, 601) userdata.set_message(message.dict()) client.publish(topic, message()) return if gateway_info.node_type != "网关设备": # 确保当前主题是网关设备 logger.Logger.error("SubDeviceRegisterService -> 节点类型错误") message = ErrorResp(message_id, 503) userdata.set_message(message.dict()) client.publish(topic, message()) return data = [] for params in msg["params"]: product_info = ProductsTable.get_product_info(userdata.table_handler, params["productId"]) if product_info is None: logger.Logger.error("SubDeviceRegisterService -> 产品ID不存在") data.append({"code": 601}) continue if product_info.node_type != "网关子设备": # 确保注册的产品是网关子设备 logger.Logger.error("SubDeviceRegisterService -> 产品ID节点类型错误") data.append({"code": 503}) continue dr = DeviceRegister( device_mac=params["deviceName"], device_name=params["displayName"] if params["displayName"] else "", device_desc=params["desc"] if params["desc"] else "", third_local_device_id=params["thirdLocalDeviceId"] if params["thirdLocalDeviceId"] else "", device_sct=None, gateway_id=gateway_id, gateway_name=gateway_info.device_name, product_name=product_info.product_name, node_type=product_info.node_type ) device_id = DevicesTable.insert_by_device_register(userdata.table_handler, dr) data.append(SDRRespItem( str(device_id), params["displayName"] if params["displayName"] else "", 0 )) message = BaseResp(message_id, data) userdata.set_message(message.dict()) client.publish(topic, message()) except (KeyError, TypeError) as e: logger.Logger.error("DeviceRegisterService -> 关键参数缺失") logger.Logger.error(f"{type(e).__name__}, {e}") if logger.DEBUG: traceback.print_exc() message = ErrorResp(message_id, 401) userdata.set_message(message) client.publish(topic, {"messageId": message_id, "time": int(time.time() * 1000), "data": [{"code": 401}]}) class UpdateDeviceStatusService(BaseService): """设备在线和离线状态的变更""" def handle(self, msg: dict, userdata: UserData): message_type = userdata.topic.split("/")[-1] if message_type == "online": device_status = "在线" else: device_status = "离线" logger.Logger.debug(f"设备状态更新请求已接收,正在执行{device_status} ...", log_path=None) client = userdata.clients["center"][0] topic = f"{userdata.topic}_resp" userdata.set_topic(topic) message_id = msg["messageId"] if "messageId" in msg.keys() else "1" try: device_id = int(userdata.topic.split("/")[2]) device_info = DevicesTable.get_device_info(userdata.table_handler, device_id) if device_info is None: # 确保当前主题中的设备ID存在 logger.Logger.error("UpdateDeviceStatusService -> 并无对应设备") message = ErrorResp(message_id, 40014) userdata.set_message(message.dict()) client.publish(topic, message()) return if message_type == "offline" and device_info.node_type == "网关设备": DevicesTable.offline_gateway(userdata.table_handler, device_id) else: DevicesTable.update_device_status(userdata.table_handler, device_id, device_status) message = SimpleResp(message_id, 0) userdata.set_message(message.dict()) client.publish(topic, message()) except (KeyError, TypeError): logger.Logger.error("UpdateDeviceStatusService -> 关键参数缺失") message = SimpleResp(message_id, 401) userdata.set_message(message.dict()) client.publish(topic, message()) class PropertyService(BaseService): """设备属性上报事件""" def handle(self, msg: dict, userdata: UserData): logger.Logger.debug("设备属性上报事件已接收,正在处理 ...", log_path=None) client = userdata.clients["center"][0] topic = f"{userdata.topic}_resp" userdata.set_topic(topic) message_id = msg["messageId"] if "messageId" in msg.keys() else "1" try: device_id = int(userdata.topic.split("/")[2]) device_info = DevicesTable.get_device_info(userdata.table_handler, device_id) if device_info is None: # 确保当前主题中的设备ID存在 logger.Logger.info("PropertyService -> 并无对应设备") logger.Logger.debug("忽略的事件上报请求已接收,即将给予成功回馈 ...", log_path=None) message = SimpleResp(message_id, 0) userdata.set_message(message.dict()) client.publish(topic, message()) return if "停车" not in device_info.product_name: # 只处理停车场设备的信息上报 logger.Logger.debug("忽略的事件上报请求已接收,即将给予成功回馈 ...", log_path=None) message = SimpleResp(message_id, 0) userdata.set_message(message.dict()) client.publish(topic, message()) return data = msg["values"] nt = now_datetime_second() # 判断是否存在对应停车场,若不存在进行新建,若存在则更新除ID之外的信息 userdata.table_handler.execute( """ INSERT INTO parkinglots (id, number, name, type, update_datetime) VALUES (?, ?, ?, 'parkinglot', ?) ON CONFLICT (id, type) DO UPDATE SET number = ?, name = ?, update_datetime = ? """, (data["parkinglot_no"], device_id, data["parkinglot_name"], nt, device_id, data["parkinglot_name"], nt) ) message = SimpleResp(message_id, 0) userdata.set_message(message.dict()) client.publish(topic, message()) except (KeyError, TypeError): logger.Logger.error("PropertyService -> 关键参数缺失") message = SimpleResp(message_id, 401) userdata.set_message(message.dict()) client.publish(topic, message()) class AreaService(BaseService): """车场区域信息变化同步事件""" def handle(self, msg: dict, userdata: UserData): logger.Logger.debug("车场区域信息变化同步事件已接收,正在处理 ...", log_path=None) client = userdata.clients["center"][0] topic = f"{userdata.topic}_resp" userdata.set_topic(topic) message_id = msg["messageId"] if "messageId" in msg.keys() else "1" try: device_id = int(userdata.topic.split("/")[2]) device_info = DevicesTable.get_device_info(userdata.table_handler, device_id) if device_info is None: # 确保当前主题中的设备ID存在 logger.Logger.error("AreaService -> 并无对应设备") message = SimpleResp(message_id, 40014) userdata.set_message(message.dict()) client.publish(topic, message()) return if "停车" not in device_info.product_name: # 确保当前主题是停车场设备 logger.Logger.error("AreaService -> 节点类型错误") message = SimpleResp(message_id, 503) userdata.set_message(message.dict()) client.publish(topic, message()) return params = msg["params"] # ["区域ID","父区域ID","区域编码","区域名称","车位总数"] # {"area_name":"地面","area_no":"1","place_numbers":"60","area_id":"1","parent_area_id":"1"} if params["parent_area_id"] == params["area_id"]: params["parent_area_id"] = None nt = now_datetime_second() userdata.table_handler.execute( "SELECT id FROM parkinglots WHERE number = ? AND type = 'parkinglot'", (device_id,)) res = userdata.table_handler.cursor.fetchall() if res: parkinglot_number = res[0][0] # 判断是否存在对应区域,若不存在进行新建,若存在则更新除ID之外的信息 userdata.table_handler.execute( """ INSERT OR REPLACE INTO parkinglots (id, number, name, type, area_id, parkinglot_id, update_datetime) VALUES (?, ?, ?, 'area', ?, ?, ?) """, (params["area_id"], params["area_no"], params["area_name"], params["parent_area_id"], parkinglot_number, nt) ) message = SimpleResp(message_id, 0) userdata.set_message(message.dict()) client.publish(topic, message()) else: message = SimpleResp(message_id, 40014) # 对应设备属性没有上报,数据库中未记载设备编码 userdata.set_message(message.dict()) client.publish(topic, message()) return except (KeyError, TypeError): logger.Logger.error("AreaService -> 关键参数缺失") message = SimpleResp(message_id, 401) userdata.set_message(message.dict()) client.publish(topic, message()) class ChannelService(BaseService): """车场通道信息变化同步事件""" def handle(self, msg: dict, userdata: UserData): logger.Logger.debug("车场通道信息变化同步事件已接收,正在执行处理 ...", log_path=None) client = userdata.clients["center"][0] topic = f"{userdata.topic}_resp" userdata.set_topic(topic) message_id = msg["messageId"] if "messageId" in msg.keys() else "1" try: device_id = int(userdata.topic.split("/")[2]) device_info = DevicesTable.get_device_info(userdata.table_handler, device_id) if device_info is None: # 确保当前主题中的设备ID存在 logger.Logger.error("ChannelService -> 并无对应设备") message = SimpleResp(message_id, 40014) userdata.set_message(message.dict()) client.publish(topic, message()) return if "停车" not in device_info.product_name: # 确保当前主题是停车场设备 logger.Logger.error("ChannelService -> 节点类型错误") message = SimpleResp(message_id, 503) userdata.set_message(message.dict()) client.publish(topic, message()) return params = msg["params"] # ["区域ID","通道ID","通道编码","通道名称","通道类型"] # {"channel_name":"测试出口","channel_no":"13","area_id":"1","channel_type":"1","channel_id":"13"} channel_type = int(params["channel_type"]) if channel_type == 0: params["channel_type"] = "入口" elif channel_type == 1: params["channel_type"] = "出口" else: params["channel_type"] = "出入口" nt = now_datetime_second() userdata.table_handler.execute( "SELECT id FROM parkinglots WHERE number = ? AND type = 'parkinglot'", (device_id,)) res = userdata.table_handler.cursor.fetchall() if res: parkinglot_number = res[0][0] # 判断是否存在对应区域,若不存在进行新建,若存在则更新除ID之外的信息 userdata.table_handler.execute( """ INSERT OR REPLACE INTO parkinglots (id, number, name, type, area_id, parkinglot_id, channel_type, update_datetime) VALUES (?, ?, ?, 'channel', ?, ?, ?, ?) """, (params["channel_id"], params["channel_no"], params["channel_name"], params["area_id"], parkinglot_number, params["channel_type"], nt) ) message = SimpleResp(message_id, 0) userdata.set_message(message.dict()) client.publish(topic, message()) else: message = SimpleResp(message_id, 40014) # 对应设备属性没有上报,数据库中未记载设备编码 userdata.set_message(message.dict()) client.publish(topic, message()) return except (KeyError, TypeError): logger.Logger.error("ChannelService -> 关键参数缺失") message = SimpleResp(message_id, 401) userdata.set_message(message.dict()) client.publish(topic, message()) class GateService(BaseService): """道闸信息同步""" def handle(self, msg: dict, userdata: UserData): logger.Logger.debug("道闸信息同步事件已接收,正在执行处理 ...", log_path=None) client = userdata.clients["center"][0] topic = f"{userdata.topic}_resp" userdata.set_topic(topic) message_id = msg["messageId"] if "messageId" in msg.keys() else "1" try: device_id = int(userdata.topic.split("/")[2]) device_info = DevicesTable.get_device_info(userdata.table_handler, device_id) if device_info is None: # 确保当前主题中的设备ID存在 logger.Logger.error("ChanGateService -> 并无对应设备") message = SimpleResp(message_id, 40014) userdata.set_message(message.dict()) client.publish(topic, message()) return if "停车" not in device_info.product_name: # 确保当前主题是停车场设备 logger.Logger.error("GateService -> 节点类型错误") message = SimpleResp(message_id, 503) userdata.set_message(message.dict()) client.publish(topic, message()) return params = msg["params"] # ["区域ID","通道ID","道闸编码","道闸ID","道闸名称","道闸品牌","道闸状态","运行状态","是否在线"] # {"gate_id":"13","gate_name":"测试出口","is_online":"0","gate_no":"13","area_id":"1","channel_id":"13"} if "is_online" in params.keys(): is_online = int(params["is_online"]) if is_online == 0: params["is_online"] = "下线" else: params["is_online"] = "上线" else: params["is_online"] = "" if "gate_status" in params.keys(): gate_status = int(params["gate_status"]) if gate_status == 0: params["gate_status"] = "关闭" elif gate_status == 1: params["gate_status"] = "开启" elif gate_status == 2: params["gate_status"] = "常开" else: params["gate_status"] = "常闭" else: params["gate_status"] = "" if "running_status" in params.keys(): running_status = int(params["running_status"]) if running_status == 0: params["running_status"] = "正常" elif running_status == 1: params["running_status"] = "故障" else: params["running_status"] = "" nt = now_datetime_second() userdata.table_handler.execute( "SELECT id FROM parkinglots WHERE number = ? AND type = 'parkinglot'", (device_id,)) res = userdata.table_handler.cursor.fetchall() if res: parkinglot_number = res[0][0] # 判断是否存在对应区域,若不存在进行新建,若存在则更新除ID之外的信息 userdata.table_handler.execute( """ INSERT OR REPLACE INTO parkinglots (id, number, name, type, area_id, parkinglot_id, channel_id, is_online, gate_status, running_status, update_datetime) VALUES (?, ?, ?, 'gate', ?, ?, ?, ?, ?, ?, ?) """, (params["gate_id"], params["gate_no"], params["gate_name"], params.get("area_id", ""), parkinglot_number, params.get("channel_id", ""), params["is_online"], params["gate_status"], params["running_status"], nt) ) message = SimpleResp(message_id, 0) userdata.set_message(message.dict()) client.publish(topic, message()) else: message = SimpleResp(message_id, 40014) # 对应设备属性没有上报,数据库中未记载设备编码 userdata.set_message(message.dict()) client.publish(topic, message()) return except (KeyError, TypeError): logger.Logger.error("GateService -> 关键参数缺失") message = SimpleResp(message_id, 401) userdata.set_message(message.dict()) client.publish(topic, message()) class NoHandlePostService(BaseService): """忽略的事件上报,直接返回成功的反馈""" def handle(self, msg: dict, userdata: UserData): logger.Logger.debug("忽略的事件上报请求已接收,即将给予成功回馈 ...", log_path=None) client = userdata.clients["center"][0] topic = f"{userdata.topic}_resp" userdata.set_topic(topic) message_id = msg["messageId"] if "messageId" in msg.keys() else "1" try: device_id = int(userdata.topic.split("/")[2]) device_info = DevicesTable.get_device_info(userdata.table_handler, device_id) if device_info is None: # 确保当前主题中的设备ID存在 logger.Logger.error("SubDeviceRegisterService -> 并无对应设备") message = SimpleResp(message_id, 601) userdata.set_message(message.dict()) client.publish(topic, message()) return message = SimpleResp(message_id, 0) userdata.set_message(message.dict()) client.publish(topic, message()) except (KeyError, TypeError): logger.Logger.error("UpdateDeviceStatusService -> 关键参数缺失") message = SimpleResp(message_id, 401) userdata.set_message(message.dict()) client.publish(topic, message()) class UpdateFixedCardService(BaseService): def handle(self, **kwargs): pass class VehicleRegistration(BaseService): def handle(self, **kwargs): pass def auto_service(msg: dict, userdata: UserData): """自动化加载不同的服务层""" message_type = userdata.topic.split("/")[-1] servicer = None if message_type == "register": # 设备注册 if userdata.topic.split("/")[-2] == "sub": servicer = Services.SubDeviceRegisterService() else: servicer = Services.DeviceRegisterService() elif message_type in ["online", "offline"]: # 设备上下线 servicer = Services.UpdateDeviceStatusService() elif message_type == "post": # 事件上报,一般是通行事件,不做处理,直接返回成功 if userdata.topic.split("/")[-2] == "property": servicer = Services.PropertyService() elif "eventCode" in msg.keys() and msg["eventCode"] in ["area", "channel", "gate"]: if msg["eventCode"] == "area": servicer = Services.AreaService() elif msg["eventCode"] == "channel": servicer = Services.ChannelService() elif msg["eventCode"] == "gate": servicer = Services.GateService() else: servicer = Services.NoHandlePostService() # 构建服务层实体并进行调用 if servicer is not None: servicer.handle(msg, userdata)