history-project/meian/devices/common_service.py
2024-10-24 09:04:38 +08:00

731 lines
40 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding:utf-8 -*-
"""
@File : common_service
@Author : xuxingchen
@Version : 2.0
@Contact : xuxingchen@sinochem.com
@Desc : 业务逻辑服务层
"""
import json
import time
import traceback
import requests
import devices.meian_model as meian_data_entity
import devices.yunfu_model as yunfu_data_entity
import logger
from logger import Logger, speed_ms
from devices.meian_db import DeviceTable, RegisterTable, GatewayTable, HeartBeatTable, UserInfoTable, RecordTable
from devices.common_model import ErrorCode, UserData, UserInfo
from utils import generate_token, to_obj, datetime_to_timestamp, generate_time_token, encode_to_base64, \
extract_building_unit, extract_number
from config import TIMEOUT_SECOND, SLEEP_TIME, BACKEND_URL, BACKEND_ID, BACKEND_SECRET, GATEWAY_CONFIG
class BaseService:
def __init__(self):
self.table_handler = None
self.topic = None
self.clients = None
def set_meta(self, userdata: UserData):
self.table_handler = userdata.table_handler
self.topic = userdata.topic
self.clients = userdata.clients
def handle(self, **kwargs):
"""must be `override`"""
pass
def get_owner_house_floor(user_id, project_code: str, device_position_desc: str) -> int:
"""根据用户ID获取用户房产信息提取楼层信息并返回
1. 该功能只支持业主使用,访客查询不到,默认返回
"""
token_url = f"{BACKEND_URL}/v2/accesskey_auth"
owner_info_url = f"{BACKEND_URL}/v3/realty-master-data/owners/{user_id}"
house_info_url = f"{BACKEND_URL}/v3/realty-master-data/houses"
# get token
data = {'id': BACKEND_ID, 'secret': BACKEND_SECRET}
token_response = requests.post(token_url, json=data)
if token_response.status_code == 200:
logger.Logger.debug(f"token 请求成功,响应内容:{token_response.json()}")
token = token_response.json()["access_token"]
else:
logger.Logger.error(f"token 请求失败,状态码: {token_response.status_code}")
raise Exception(f"token 请求失败,状态码: {token_response.status_code}")
# get house ids
headers = {
"Accept": "application/json",
"Access-Token": token,
"Content-Type": "application/json;charset=UTF-8"
}
owner_info_response = requests.get(owner_info_url, headers=headers)
if owner_info_response.status_code == 200:
owner_info = owner_info_response.json()
logger.Logger.debug(f"owner_info 请求成功,响应内容:{owner_info}")
# 根据 project_code 获取 project_id
if GATEWAY_CONFIG[project_code][2] == "" or owner_info["data"]["project_id"] == GATEWAY_CONFIG[project_code][2]:
house_ids = owner_info["data"]["house_ids"]
else:
logger.Logger.error(f"project id 不匹配,"
f"预置:{GATEWAY_CONFIG[project_code][2]}, 用户信息所属:{owner_info['data']['project_id']}")
raise Exception(f"project id 不匹配")
elif owner_info_response.status_code == 404 and owner_info_response.json()["msg"] == "OWNER_NOT_EXISTS":
logger.Logger.warn(f"owner_info 请求成功但用户ID不存在非住户")
return 1
else:
logger.Logger.error(f"token 请求失败,状态码: {token_response.status_code}")
raise Exception(f"token 请求失败,状态码: {token_response.status_code}")
# 结合设备注册备注 获取楼层信息
data = {
"query": {
"id": {"$in": house_ids}
}
}
house_info_response = requests.post(house_info_url, json=data, headers=headers)
if house_info_response.status_code == 200:
logger.Logger.debug(f"house_info 请求成功,响应内容:{house_info_response.json()}")
building_number, unit_number = extract_building_unit(device_position_desc)
house_info = house_info_response.json()["data"]["list"]
floor = 1
for house_item in house_info:
building_name = extract_number(house_item["building_name"]).zfill(2)
unit_name = extract_number(house_item["unit_name"]).zfill(2)
if building_name == building_number and unit_name == unit_number:
floor = int(house_item["floor_name"])
break
else:
logger.Logger.error(f"house_info 请求失败,状态码: {token_response.status_code}")
raise Exception(f"house_info 请求失败,状态码: {token_response.status_code}")
return floor
class Services:
"""业务逻辑入口"""
class HeartBeatService(BaseService):
def handle(self, msg_obj: meian_data_entity.HeartBeat):
# Logger.debug("设备心跳已接收,正在执行上线 ...", log_path=None)
last_time = HeartBeatTable.get_last_time(self.table_handler, msg_obj.device_id)
# 若无心跳记录或者距离上次心跳记录时间达到5分钟
if (last_time is None or
(time.time() - float(
HeartBeatTable.get_last_time(self.table_handler, msg_obj.device_id))) > (60. * 5.)):
status = HeartBeatTable.update(self.table_handler, msg_obj, self.topic)
if status:
# 执行上线操作
project_code = DeviceTable.get_project_code(self.table_handler, msg_obj.device_id)
if project_code:
gateway_id, _ = GatewayTable.get_gateway(self.table_handler, project_code)
if gateway_id is None:
Logger.error("网关不存在")
return -1
aiot_id = DeviceTable.get_aiot_id(self.table_handler, msg_obj.device_id)
if aiot_id is None:
Logger.warn("设备未注册,不执行上线操作")
return -1
topic = f"/jmlink/{aiot_id}/comm/online"
topic_resp = f"/jmlink/{aiot_id}/comm/online_resp"
s = time.time()
# 获取客户端
client_1 = self.clients[gateway_id][0]
userdata = self.clients[gateway_id][1]
userdata.set_status_add("status", False)
userdata.set_status_add("start_timestamp", time.time())
try:
# 订阅回传
client_1.subscribe(topic_resp)
Logger.debug(f"subscribe topics: {topic_resp}")
# 发布事件
token = generate_token()
userdata.set_token(token)
online_json = json.dumps(yunfu_data_entity.Online(messageId=token).__dict__)
client_1.publish(topic, online_json)
while True:
if userdata.status["status"]:
# 拿到结果后写入到数据库中更新数据
if "code" in userdata.status["response"].keys():
error_code = userdata.status["response"]["code"]
if error_code == 0:
Logger.info(f"设备 - {aiot_id} 完成上线")
else:
Logger.error(
f"{topic_resp} 返回错误码: {error_code}, "
f"{yunfu_data_entity.error_code[error_code]}")
break
if time.time() - userdata.status["start_timestamp"] > TIMEOUT_SECOND: # 超时跳出
Logger.debug(
f"等待回复超时: {speed_ms(userdata.status['start_timestamp'])}ms")
break
# Logger.debug(f"{userdata.status} waiting for {topic_resp} ... ")
time.sleep(SLEEP_TIME)
finally:
# 属性复位
client_1.unsubscribe(topic_resp)
# Logger.debug(f"移除订阅: {topic_resp}")
userdata.set_token(None)
userdata.set_status_remove("response")
# Logger.debug(f"{speed_ms(s)}ms")
else:
Logger.warn("上线请求非法,默认无操作")
else:
Logger.debug("未达到重新上线阈值,默认无操作")
pass
class RegisterService(BaseService):
def handle(self, msg_obj: meian_data_entity.Register):
Logger.debug("RegisterService Handle ...")
status = RegisterTable.update(self.table_handler, msg_obj, self.topic)
if status:
if not DeviceTable.get_device_register_type(self.table_handler, msg_obj.device_id):
Logger.debug("Execute register ...")
project_code = DeviceTable.get_project_code(self.table_handler, msg_obj.device_id)
if project_code:
gateway_id, _ = GatewayTable.get_gateway(self.table_handler, project_code)
if gateway_id is None:
Logger.error("网关不存在")
return -1
topic = f"/jmlink/{gateway_id}/comm/sub/register"
topic_resp = f"/jmlink/{gateway_id}/comm/sub/register_resp"
# 将连接信息(网关&网关secret)和回传信息发送给回传监听进程
s = time.time()
# 获取客户端
client_1 = self.clients[gateway_id][0]
userdata = self.clients[gateway_id][1]
userdata.set_status_add("status", False)
userdata.set_status_add("start_timestamp", time.time())
try:
# 订阅回传
client_1.subscribe(topic_resp)
Logger.debug(f"subscribe topics: {topic_resp}")
# 发布事件
token = generate_token()
userdata.set_token(token)
register_json = json.dumps(yunfu_data_entity.CommRegister(
messageId=token,
params=yunfu_data_entity.RegisterParam(
deviceName=msg_obj.device_id,
displayName=f"美安-{msg_obj.device_id}-{msg_obj.device_position_desc}"
).__dict__
).__dict__)
Logger.debug(register_json)
client_1.publish(topic, register_json)
while True:
if userdata.status["status"]:
# 拿到结果后写入到数据库中更新数据
if "data" in userdata.status["response"].keys():
resp = userdata.status["response"]["data"][0]
if resp["code"] == 0:
DeviceTable.update_aiot_id(self.table_handler, resp["deviceName"],
resp["deviceId"])
Logger.info(f"设备 - {msg_obj.device_id} 完成注册")
# 每次完成注册后刷新订阅信息
Logger.debug(f"刷新订阅列表中 ...")
sub_aiot_id_list = json.loads(
GatewayTable.get_registered_sub_aiot_id(self.table_handler, gateway_id)[
0])
current_topics = set(
[f"/jmlink/{aiot_id}/tml/service/call" for aiot_id in sub_aiot_id_list])
last_topics = set(userdata.topics)
topics_to_subscribe = current_topics - last_topics
topics_to_unsubscribe = last_topics - current_topics
if topics_to_subscribe or topics_to_unsubscribe:
# 订阅缺少的主题
for topic in list(topics_to_subscribe):
client_1.subscribe(topic)
Logger.debug(f"新增订阅: {topic}")
# 取消订阅多余的主题
for topic in list(topics_to_unsubscribe):
client_1.unsubscribe(topic)
Logger.debug(f"移除订阅: {topic}")
else:
Logger.debug(f"订阅列表无变化")
Logger.debug(f"订阅列表刷新完成 ✅")
else:
Logger.error(
f"{topic_resp} 返回错误码: {resp['code']}, "
f"{yunfu_data_entity.error_code[resp['code']]}")
break
if time.time() - userdata.status["start_timestamp"] > TIMEOUT_SECOND: # 超时跳出
Logger.debug(
f"等待回复超时: {speed_ms(userdata.status['start_timestamp'])}ms")
break
# Logger.debug(f"{userdata.status} waiting for {topic_resp} ... ")
time.sleep(SLEEP_TIME) # 实际运行中设为0.1100ms检测一次
finally:
client_1.unsubscribe(topic_resp)
Logger.debug(f"移除订阅: {topic_resp}")
userdata.set_token(None)
userdata.set_status_remove("response")
Logger.debug(f"{speed_ms(s)}ms")
else:
Logger.error("网关配置表中不存在设备关联的项目信息")
else:
Logger.debug("设备已注册过,默认无操作")
class PushRtAccessRecordService(BaseService):
def handle(self, msg_obj: meian_data_entity.PushRtAccessRecord):
Logger.debug("Execute open_event ...")
# 若存在通行记录,但人员信息表中不存在人员信息,则将通行记录本地保存留用
if msg_obj.user_id.startswith("old-"): # 历史遗留在设备中的人员id应当以old-开头进行插入
try:
RecordTable.add(self.table_handler, msg_obj)
Logger.info("通行事件本地记录成功")
return 1
except Exception as e:
Logger.error(f"{type(e).__name__}, {e}")
if logger.DEBUG:
traceback.print_exc()
# 查询子设备ID
aiot_id = DeviceTable.get_aiot_id(self.table_handler, msg_obj.device_id)
if aiot_id is None:
Logger.warn(f"Device - {aiot_id} is not registered")
return -1
topic = f"/jmlink/{aiot_id}/tml/event/post"
topic_resp = f"/jmlink/{aiot_id}/tml/event/post_resp"
project_code = DeviceTable.get_project_code(self.table_handler, msg_obj.device_id)
if project_code:
gateway_id, _ = GatewayTable.get_gateway(self.table_handler, project_code)
# 将连接信息(网关&网关secret)和回传信息发送给回传监听进程
s = time.time()
# 获取客户端
client_1 = self.clients[gateway_id][0]
userdata = self.clients[gateway_id][1]
userdata.set_status_add("status", False)
userdata.set_status_add("start_timestamp", time.time())
# 订阅回传
client_1.subscribe(topic_resp)
Logger.debug(f"subscribe topics: {topic_resp}")
try:
# 数据转换
open_event = yunfu_data_entity.OpenEvent()
open_event.datetime = str(datetime_to_timestamp(msg_obj.time))
open_event.type = yunfu_data_entity.OpenEventType.__dict__[msg_obj.access_mode.upper()].value
open_event.certificate_type = yunfu_data_entity.OpenEventCertificateType.__dict__[
msg_obj.access_mode.upper()].value
# 若是二维码通信根据二维码获取用户信息若是人脸通信根据userid获取用户信息
if msg_obj.access_mode == "qrCode":
user = UserInfoTable.get_user_by_qrcode(self.table_handler, msg_obj.user_id, msg_obj.device_id)
open_event.code = user[0]
open_event.certificate = user[0]
name = user[1]
else:
name = UserInfoTable.get_name(self.table_handler, msg_obj.user_id, msg_obj.device_id)
open_event.code = msg_obj.user_id
open_event.certificate = msg_obj.user_id
open_event.name = name
open_event.check()
token = generate_token()
userdata.set_token(token)
event_json = json.dumps(yunfu_data_entity.EventPost(
messageId=token, eventCode="open_event",
params=open_event.__dict__
).__dict__)
# 发布事件
client_1.publish(topic, event_json)
# 等待回传
while True:
if userdata.status["status"]:
if "code" in userdata.status["response"].keys():
if userdata.status["response"]["code"] != 0:
error_code = userdata.status['response']['errorCode']
Logger.error(f"{topic_resp} 返回错误码: {error_code}, "
f"{meian_data_entity.error_code[error_code]}")
else:
Logger.debug("通行事件推送成功")
break
if time.time() - userdata.status["start_timestamp"] > TIMEOUT_SECOND: # 超时跳出
Logger.error("等待回复超时")
break
# Logger.debug(f"{userdata.status} waiting for {topic_resp} ... ")
time.sleep(SLEEP_TIME)
finally:
# 属性复位
client_1.unsubscribe(topic_resp)
Logger.debug(f"移除订阅: {topic_resp}")
userdata.set_token(None)
userdata.set_status_remove("response")
Logger.debug(f"{speed_ms(s)}ms")
class GetQrCodeService(BaseService):
"""获取访客二维码
1. 将接受的用户信息记录追加或更新到本地DB
2. 根据时间戳和访客id生成一个token字符串作为唯一识别id
3. 将生成的二维码下置给指定设备
4. 将唯一识别id作为二维码字符串返回给平台
"""
def handle(self, msg: dict):
Logger.debug("Execute Get QrCode ...")
code = ErrorCode.SUCCESS
device_id = DeviceTable.get_device_id(self.table_handler, self.topic.split("/tml")[0].split("link/")[1])
project_code = DeviceTable.get_project_code(self.table_handler, device_id)
gateway_id, _ = GatewayTable.get_gateway(self.table_handler, project_code)
success_aiot_list = []
aiot_ids = []
qrcode = None
try:
# 根据时间戳和访客id生成一个token字符串作为唯一识别id
qrcode = generate_time_token(msg["params"]["user_id"])
# 将生成的二维码下置给指定设备
aiot_ids = json.loads(msg["params"]["device_ids"])
s = time.time()
for aiot_id in aiot_ids: # 一般情况 aiot_ids 里应该只有一个值
# 查询对应aiot_id实际的美安设备id和订阅主题
device_id = DeviceTable.get_device_id(self.table_handler, aiot_id)
if device_id is None:
continue
# 将接受的用户信息记录追加或更新到本地DB
user_type = 0 if msg["serviceCode"] != "get_visitor_qrcode" else 1 # 业主0 访客1身份类型在插入后就不会再被更新
device_position_desc = RegisterTable.get_device_position_desc(self.table_handler, device_id)
if user_type == 1 or device_position_desc is None:
floor = 1
else:
floor = get_owner_house_floor(msg["params"]["user_id"], project_code, device_position_desc)
user_info = UserInfo(
user_id=msg["params"]["user_id"],
device_id=device_id,
name=msg["params"]["name"],
user_type=user_type
)
UserInfoTable.update(self.table_handler, user_info)
topic, topic_resp = DeviceTable.get_device_topic(self.table_handler, device_id)
# 获取客户端
client_0 = self.clients["center"][0]
userdata_0 = self.clients["center"][1]
userdata_0.set_status_add("status", False)
userdata_0.set_status_add("start_timestamp", time.time())
try:
Logger.debug(f"subscribe topics: {topic_resp}")
# 构建消息体
token = generate_token() # 正式运行应使用 generate_token()
userdata_0.set_token(token)
userdata_0.set_code("qrCodeDownState")
qrcode_json = json.dumps(meian_data_entity.QrCodeInfo(
dataType="qrCodeInfo",
deviceId=device_id,
token=token,
userId=qrcode,
qrCode=encode_to_base64(qrcode),
floor=floor
).__dict__)
# 发布事件
client_0.publish(topic, qrcode_json)
# 等待回传
while True:
if userdata_0.status["status"]:
if "errorCode" in userdata_0.status["response"].keys():
if userdata_0.status["response"]["errorCode"] != 0:
error_code = userdata_0.status['response']['errorCode']
Logger.error(f"{topic_resp} 返回错误码: {error_code}, "
f"{meian_data_entity.error_code[error_code]}")
else:
Logger.info(f"设备 - {device_id} 二维码 - {qrcode} 完成下置")
UserInfoTable.update_qrcode(self.table_handler,
user_info.user_id, device_id, qrcode)
success_aiot_list.append(aiot_id)
break
if time.time() - userdata_0.status["start_timestamp"] > TIMEOUT_SECOND: # 超时跳出
Logger.error("等待回复超时")
break
# Logger.debug(f"{userdata_0.status} waiting for {topic_resp} ... ")
time.sleep(SLEEP_TIME)
finally:
userdata_0.set_token(None)
userdata_0.set_code(None)
userdata_0.set_status_remove("response")
Logger.debug(f"{speed_ms(s)}ms")
except Exception as e:
Logger.error(f"{type(e).__name__}, {e}")
if type(e).__name__ == "TypeError":
code = ErrorCode.INPUT_TYPE_ERROR
else:
code = ErrorCode.UNKNOWN
# 将唯一识别id作为二维码字符串返回给平台
# 获取客户端
client_1 = self.clients[gateway_id][0]
userdata = self.clients[gateway_id][1]
userdata.set_status_add("status", False)
userdata.set_status_add("start_timestamp", time.time())
try:
if code == ErrorCode.SUCCESS:
if set(success_aiot_list) == set(aiot_ids): # 全部完成下发
Logger.debug("指令完成下发")
elif success_aiot_list: # 完成一部分下发
code = ErrorCode.PART_SUCCESS
Logger.warn(f"部分成功下发: {success_aiot_list}")
UserInfoTable.update_qrcode(self.table_handler, msg["params"]["user_id"], project_code, qrcode)
elif len(success_aiot_list) == 0: # 一个都没下发
code = ErrorCode.FAILURE
Logger.error("无指令完成下发")
aiot_topic = self.topic + "_resp"
return_json = json.dumps(yunfu_data_entity.EventPostResp(
messageId=msg["messageId"],
requestTime=msg["time"],
serviceCode=msg["serviceCode"],
data=yunfu_data_entity.QrCodeResp(code=code.value, message=code.message, qrcode=qrcode).__dict__
).__dict__)
client_1.publish(aiot_topic, return_json)
finally:
userdata.set_token(None)
userdata.set_status_remove("response")
class AddFaceService(BaseService):
def handle(self, msg: dict):
Logger.debug("Execute Add Face ...")
code = ErrorCode.SUCCESS
device_id = DeviceTable.get_device_id(self.table_handler, self.topic.split("/tml")[0].split("link/")[1])
project_code = DeviceTable.get_project_code(self.table_handler, device_id)
gateway_id, _ = GatewayTable.get_gateway(self.table_handler, project_code)
success_aiot_list = []
aiot_ids = json.loads(msg["params"]["device_ids"])
user_id = msg["params"]["user_id"]
face_url = f"{msg['params']['face_url']}?x-oss-process=image/resize,m_pad,h_960,w_540,color_FFFFFF"
try:
# 将人脸下置给指定设备
s = time.time()
for aiot_id in aiot_ids:
# 查询对应aiot_id实际的美安设备id和订阅主题
device_id = DeviceTable.get_device_id(self.table_handler, aiot_id)
if device_id is None:
continue
# 将接受的用户信息记录追加或更新到本地DB
user_info = UserInfo(
user_id=user_id,
device_id=device_id,
name=msg["params"]["name"],
user_type=0
)
UserInfoTable.update(self.table_handler, user_info)
topic, topic_resp = DeviceTable.get_device_topic(self.table_handler, device_id)
# 获取客户端
client_0 = self.clients["center"][0]
userdata_0 = self.clients["center"][1]
userdata_0.set_status_add("status", False)
userdata_0.set_status_add("start_timestamp", time.time())
try:
Logger.debug(f"subscribe topics: {topic_resp}")
# 构建消息体
token = generate_token() # 正式运行应使用 generate_token()
userdata_0.set_token(token)
userdata_0.set_code("faceDownState")
device_position_desc = RegisterTable.get_device_position_desc(self.table_handler, device_id)
if device_position_desc is None:
floor = 1
else:
floor = get_owner_house_floor(user_id, project_code, device_position_desc)
face_json = json.dumps(meian_data_entity.FaceInfo(
dataType="faceInfo",
deviceId=device_id,
token=token,
userId=user_id,
faceUrl=face_url,
floor=floor
).__dict__)
# 发布事件
client_0.publish(topic, face_json)
Logger.debug(f"{face_json} ==> {topic}")
# 等待回传
while True:
if userdata_0.status["status"]:
if "errorCode" in userdata_0.status["response"].keys():
if userdata_0.status["response"]["errorCode"] != 0:
error_code = userdata_0.status['response']['errorCode']
Logger.error(f"{topic_resp} 返回错误码: {error_code}, "
f"{meian_data_entity.error_code[error_code]}")
else:
Logger.info(f"设备 - {device_id} 用户({user_id}) 人脸完成下置")
UserInfoTable.update_face_url(self.table_handler, user_id, device_id, face_url)
success_aiot_list.append(aiot_id)
break
if time.time() - userdata_0.status["start_timestamp"] > TIMEOUT_SECOND: # 超时跳出
Logger.error("等待回复超时")
break
# Logger.debug(f"{userdata_0.status} waiting for {topic_resp} ... ")
time.sleep(SLEEP_TIME)
finally:
userdata_0.set_token(None)
userdata_0.set_code(None)
userdata_0.set_status_remove("response")
Logger.debug(f"{speed_ms(s)}ms")
except Exception as e:
Logger.error(f"{type(e).__name__}, {e}")
if type(e).__name__ == "TypeError":
code = ErrorCode.INPUT_TYPE_ERROR
else:
code = ErrorCode.UNKNOWN
# 将下置结果返回给平台
# 获取客户端
client_1 = self.clients[gateway_id][0]
userdata = self.clients[gateway_id][1]
userdata.set_status_add("status", False)
userdata.set_status_add("start_timestamp", time.time())
try:
if code == ErrorCode.SUCCESS:
if set(success_aiot_list) == set(aiot_ids): # 全部完成下发
Logger.info("指令完成下发")
elif success_aiot_list: # 完成一部分下发
code = ErrorCode.PART_SUCCESS
Logger.warn(f"部分成功下发: {success_aiot_list}")
else: # 一个都没下发
code = ErrorCode.FAILURE
Logger.error("无指令完成下发")
aiot_topic = self.topic + "_resp"
return_json = json.dumps(yunfu_data_entity.EventPostResp(
messageId=msg["messageId"],
requestTime=msg["time"],
serviceCode=msg["serviceCode"],
data=yunfu_data_entity.AddFaceResp(code=code.value, message=code.message).__dict__
).__dict__)
client_1.publish(aiot_topic, return_json)
finally:
userdata.set_token(None)
userdata.set_status_remove("response")
class DeleteFaceService(BaseService):
def handle(self, msg: dict):
Logger.debug("Execute Delete Face ...")
code = ErrorCode.SUCCESS
device_id = DeviceTable.get_device_id(self.table_handler, self.topic.split("/tml")[0].split("link/")[1])
project_code = DeviceTable.get_project_code(self.table_handler, device_id)
gateway_id, _ = GatewayTable.get_gateway(self.table_handler, project_code)
success_aiot_list = []
aiot_ids = json.loads(msg["params"]["device_ids"])
user_id = msg["params"]["user_id"]
try:
s = time.time()
# 将人脸删除指令下发指定设备
for aiot_id in aiot_ids:
# 查询对应aiot_id实际的美安设备id和订阅主题
device_id = DeviceTable.get_device_id(self.table_handler, aiot_id)
if device_id is None:
continue
# 本地DB判断人脸信息是否存在
if UserInfoTable.exists_face_url(self.table_handler, user_id, device_id):
topic, topic_resp = DeviceTable.get_device_topic(self.table_handler, device_id)
# 获取客户端
client_0 = self.clients["center"][0]
userdata_0 = self.clients["center"][1]
userdata_0.set_status_add("status", False)
userdata_0.set_status_add("start_timestamp", time.time())
try:
Logger.debug(f"subscribe topics: {topic_resp}")
# 构建消息体
token = generate_token() # 正式运行应使用 generate_token()
userdata_0.set_token(token)
userdata_0.set_code("deleteUserState")
face_json = json.dumps(meian_data_entity.DeleteUser(
dataType="deleteUser",
deviceId=device_id,
token=token,
userId=user_id
).__dict__)
# 发布事件
client_0.publish(topic, face_json)
# 等待回传
while True:
if userdata_0.status["status"]:
if "errorCode" in userdata_0.status["response"].keys():
if userdata_0.status["response"]["errorCode"] != 0:
error_code = userdata_0.status['response']['errorCode']
Logger.error(f"{topic_resp} 返回错误码: {error_code}, "
f"{meian_data_entity.error_code[error_code]}")
else:
Logger.info(f"设备 - {device_id} 移除用户({user_id})人脸")
UserInfoTable.update_face_url(self.table_handler, user_id, device_id, None)
success_aiot_list.append(aiot_id)
break
if time.time() - userdata_0.status["start_timestamp"] > TIMEOUT_SECOND: # 超时跳出
Logger.error("等待回复超时")
break
# Logger.debug(f"{userdata_0.status} waiting for {topic_resp} ... ")
time.sleep(SLEEP_TIME)
finally:
userdata_0.set_token(None)
userdata_0.set_code(None)
userdata_0.set_status_remove("response")
else:
code = ErrorCode.NEVER_POST_FACE
Logger.warn(f"本地数据库中未发现下发过人脸信息,默认无操作")
Logger.debug(f"{speed_ms(s)}ms")
except Exception as e:
Logger.error(f"{type(e).__name__}, {e}")
if type(e).__name__ == "TypeError":
code = ErrorCode.INPUT_TYPE_ERROR
else:
code = ErrorCode.UNKNOWN
# 将下置结果返回给平台
# 获取客户端
client_1 = self.clients[gateway_id][0]
userdata = self.clients[gateway_id][1]
userdata.set_status_add("status", False)
userdata.set_status_add("start_timestamp", time.time())
try:
if code == ErrorCode.SUCCESS:
if set(success_aiot_list) == set(aiot_ids): # 全部完成下发
Logger.info("指令完成下发")
elif success_aiot_list: # 完成一部分下发
code = ErrorCode.PART_SUCCESS
Logger.warn(f"部分成功下发: {success_aiot_list}")
else: # 一个都没下发
code = ErrorCode.FAILURE
Logger.error("无指令完成下发")
aiot_topic = self.topic + "_resp"
return_json = json.dumps(yunfu_data_entity.EventPostResp(
messageId=msg["messageId"],
requestTime=msg["time"],
serviceCode=msg["serviceCode"],
data=yunfu_data_entity.DelFaceResp(code=code.value, message=code.message).__dict__
).__dict__)
client_1.publish(aiot_topic, return_json)
finally:
userdata.set_token(None)
userdata.set_status_remove("response")
def auto_service(msg: dict, userdata: UserData):
"""跟据dataType自动化加载不同的服务层"""
if "dataType" in msg.keys():
entity_name = msg["dataType"][0].upper() + msg["dataType"][1:]
if entity_name in meian_data_entity.__dict__.keys():
Logger.debug(f"Target service name: {entity_name}")
# 转为服务层数据实体
entity_type = meian_data_entity.__dict__[entity_name]
msg_obj = to_obj(msg, entity_type)
# 构建服务层实体并进行调用
service_name = entity_name + "Service"
if service_name in Services.__dict__.keys():
servicer = Services.__dict__[service_name]()
servicer.set_meta(userdata)
servicer.handle(msg_obj)
elif "serviceCode" in msg.keys() and "params" in msg.keys(): # 校验是aiot平台服务调用下发
service_map = {
"get_owner_qrcode": "GetQrCodeService",
"get_visitor_qrcode": "GetQrCodeService",
"add_face": "AddFaceService",
"del_face": "DeleteFaceService"
}
# 构建服务层实体并进行调用
if msg["serviceCode"] in service_map.keys():
service_name = service_map[msg["serviceCode"]]
servicer = Services.__dict__[service_name]()
servicer.set_meta(userdata)
servicer.handle(msg)
else:
pass