2024-10-24 09:23:39 +08:00

324 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding:utf-8 -*-
"""
@Author : xuxingchen
@Contact : xuxingchen@sinochem.com
@Desc : 由服务主动调用的一些方法实现调用后自动连接mqtt进行任务下发和回馈处理解耦服务端与mqtt
"""
import json
import time
import traceback
from typing import Optional
from pydantic import BaseModel, field_validator
from config import (BROKER_HOST, BROKER_PORT, BROKER_USERNAME, BROKER_PASSWD, PREFIX_PATH, SUB_PATH,
SLEEP_TIME, TIMEOUT_SECOND)
from config import APP_HOST as HOST_IP
from config import IMAGE_SERVER_PORT as PORT
from utils import logger
from utils.misc import generate_captcha_text, UserData, create_mqtt_client, on_publish, on_disconnect, now_tz_datetime
class CallMessage:
def __init__(self, service_code, params):
self.messageId = generate_captcha_text(16)
self.version = "1.0"
self.time = int(time.time() * 1000)
self.fromUid = "system"
self.serviceCode = service_code
self.params = params
def dict(self):
_dict = self.__dict__.copy()
for key, value in self.__dict__.items():
if value is None:
_dict.pop(key)
_dict["from"] = "backend"
return _dict
def __call__(self, *args, **kwargs):
return json.dumps(self.dict())
class AddFaceItem(BaseModel):
name: str
user_id: str | int
device_ids: str
start_date: str = now_tz_datetime()
phone_number: str = ""
face_url: str
expire_date: Optional[str] = "2099-01-01T01:01:01.001Z"
@field_validator("user_id")
def check_user_id(cls, value):
if isinstance(value, int):
return str(value)
return value
class DelFaceItem(BaseModel):
user_id: str | int
device_ids: str
@field_validator("user_id")
def check_user_id(cls, value):
if isinstance(value, int):
return str(value)
return value
ACTION = {
"insert": "新增",
"update": "更新",
"delete": "删除"
}
class VehicleRegistrationItem(BaseModel):
car_plate_no: str # 车牌
registration_id: str
owner_id: str
owner_name: str
registration_time: str
begin_time: str
end_time: str = "2099-01-01T01:01:01.001Z"
action: str # insert, update, delete
registration_type: str # 0: 业主1: 访客
class UpdateFixedCardItem(BaseModel):
vehicle_owner_id: str
vehicle_owner_name: str
vehicle_owner_phone: str
plate_number: str # 车牌
effective_date_begin: str
effective_date_end: str
action: str # insert, update, delete
card_id: str # 在新增时进行生成
def on_connect(client, userdata, _flags, rc):
logger.Logger.debug(f"🔗 Mqtt connection! {{rc: {rc}}} 🔗")
if userdata.topics:
_topics = [(topic, 0) for topic in userdata.topics]
client.subscribe(_topics)
logger.Logger.debug(f"subscribe topics: {userdata.topics}")
def on_message(_client, userdata: UserData, message):
try:
logger.Logger.debug("💡 接收到新数据,执行 on_message 中 ...", log_path=None)
msg = json.loads(message.payload.decode().replace("\x00", ""))
logger.Logger.info(f"{message.topic}: {msg}")
userdata.set_topic(message.topic)
if userdata.token:
if "messageId" in msg.keys() and userdata.token == msg["messageId"]: # 兼容aiot平台
userdata.set_status_add("status", True)
userdata.set_status_add("response", msg)
except Exception as e:
logger.Logger.error(f"{type(e).__name__}, {e}")
if logger.DEBUG:
traceback.print_exc()
class ServicesCall:
def __init__(self):
# 创建mqtt连接
self.userdata = UserData()
self.client = create_mqtt_client(BROKER_HOST, BROKER_PORT,
self.userdata, on_message, on_publish, on_connect, on_disconnect,
username=BROKER_USERNAME, password=BROKER_PASSWD)
self.client.loop_start()
def __del__(self):
self.client.loop_stop()
def add_face(self, device_id, obj: AddFaceItem) -> tuple[bool, int, str]:
if obj.face_url.startswith("http"):
face_filename = obj.face_url.split('/')[-1]
else:
face_filename = obj.face_url
obj.face_url = f"http://{HOST_IP}:{PORT}{PREFIX_PATH}/{SUB_PATH}/{face_filename}"
topic = f"/jmlink/{device_id}/tml/service/call"
self.userdata.set_topic(topic)
logger.Logger.debug(f"{'=' * 10} 🚩 AddFace 🚩 {'=' * 10}")
logger.Logger.debug(f"{obj.device_ids} 添加用户 {obj.user_id} 的人脸授权")
cm = CallMessage("add_face", obj.__dict__)
self.userdata.set_message(cm.dict())
# 等待回传
self.userdata.set_token(cm.messageId)
self.userdata.set_status_add("status", False)
self.userdata.set_status_add("start_timestamp", time.time())
topic_resp = f"/jmlink/{device_id}/tml/service/call_resp"
self.client.subscribe(topic_resp)
self.client.publish(topic, cm())
try:
while True:
if self.userdata.status["status"]:
if "code" in self.userdata.status["response"].keys():
if self.userdata.status["response"]["code"] != 0:
error_code = self.userdata.status['response']['code']
logger.Logger.error(
f"用户 - {obj.user_id} 在设备 - {device_id} 上人脸下置失败,错误码 {error_code}")
return False, error_code, self.userdata.status['response']['message']
else:
logger.Logger.info(f"设备 - {obj.device_ids} 用户({obj.user_id}) 人脸完成授权")
return True, 0, ""
if time.time() - self.userdata.status["start_timestamp"] > TIMEOUT_SECOND: # 超时跳出
logger.Logger.error("等待回复超时")
return False, -1, "等待回复超时"
time.sleep(SLEEP_TIME)
finally:
self.client.unsubscribe(topic_resp)
logger.Logger.debug(f"移除订阅: {topic_resp}")
self.userdata.set_token(None)
self.userdata.set_status_remove("response")
logger.Logger.debug(f"{'=' * 10} 🏁 AddFace 🏁 {'=' * 10}")
def del_face(self, device_id, obj: DelFaceItem) -> tuple[bool, int, str]:
topic = f"/jmlink/{device_id}/tml/service/call"
self.userdata.set_topic(topic)
logger.Logger.debug(f"{'=' * 10} 🚩 DelFace 🚩 {'=' * 10}")
logger.Logger.debug(f"{obj.device_ids} 移除用户 {obj.user_id} 的人脸授权")
cm = CallMessage("del_face", obj.__dict__)
self.userdata.set_message(cm.dict())
# 等待回传
self.userdata.set_token(cm.messageId)
self.userdata.set_status_add("status", False)
self.userdata.set_status_add("start_timestamp", time.time())
topic_resp = f"/jmlink/{device_id}/tml/service/call_resp"
self.client.subscribe(topic_resp)
self.client.publish(topic, cm())
try:
while True:
if self.userdata.status["status"]:
if "code" in self.userdata.status["response"].keys():
if self.userdata.status["response"]["code"] != 0:
error_code = self.userdata.status['response']['code']
logger.Logger.error(
f"用户 - {obj.user_id} 在设备 - {device_id} 上人脸移除失败,错误码 {error_code}")
return False, error_code, self.userdata.status['response']['message']
else:
logger.Logger.info(f"设备 - {obj.device_ids} 用户({obj.user_id}) 人脸完成移除")
return True, 0, ""
if time.time() - self.userdata.status["start_timestamp"] > TIMEOUT_SECOND: # 超时跳出
logger.Logger.error("等待回复超时")
return False, -1, "等待回复超时"
time.sleep(SLEEP_TIME)
finally:
self.client.unsubscribe(topic_resp)
logger.Logger.debug(f"移除订阅: {topic_resp}")
self.userdata.set_token(None)
self.userdata.set_status_remove("response")
logger.Logger.debug(f"{'=' * 10} 🏁 DelFace 🏁 {'=' * 10}")
def vehicle_registration(self, device_id, obj: VehicleRegistrationItem) -> tuple[bool, int, str]:
topic = f"/jmlink/{device_id}/tml/service/call"
self.userdata.set_topic(topic)
logger.Logger.debug(f"{'=' * 10} 🚩 VehicleRegistration 🚩 {'=' * 10}")
logger.Logger.debug(f"{device_id} {ACTION[obj.action]}车辆 {obj.car_plate_no} 信息")
cm = CallMessage("vehicle_registration", obj.__dict__)
self.userdata.set_message(cm.dict())
# 等待回传
self.userdata.set_token(cm.messageId)
self.userdata.set_status_add("status", False)
self.userdata.set_status_add("start_timestamp", time.time())
topic_resp = f"/jmlink/{device_id}/tml/service/call_resp"
self.client.subscribe(topic_resp)
self.client.publish(topic, cm())
try:
while True:
if self.userdata.status["status"]:
if "code" in self.userdata.status["response"].keys():
if self.userdata.status["response"]["code"] != 0:
error_code = self.userdata.status['response']['code']
logger.Logger.error(f"{topic_resp} 返回错误码: {error_code}")
return False, error_code, self.userdata.status['response']['message']
else:
logger.Logger.info(f"车辆({obj.car_plate_no}) 信息完成{ACTION[obj.action]}")
return True, 0, ""
if time.time() - self.userdata.status["start_timestamp"] > TIMEOUT_SECOND: # 超时跳出
logger.Logger.error("等待回复超时")
return False, -1, "等待回复超时"
time.sleep(SLEEP_TIME)
finally:
self.client.unsubscribe(topic_resp)
logger.Logger.debug(f"移除订阅: {topic_resp}")
self.userdata.set_token(None)
self.userdata.set_status_remove("response")
logger.Logger.debug(f"{'=' * 10} 🏁 VehicleRegistration 🏁 {'=' * 10}")
def update_fixed_card(self, device_id, obj: UpdateFixedCardItem) -> tuple[bool, int, str]:
logger.Logger.debug(f"{'=' * 10} 🚩 UpdateFixedCard 🚩 {'=' * 10}")
topic = f"/jmlink/{device_id}/tml/service/call"
self.userdata.set_topic(topic)
logger.Logger.debug(f"{device_id} {ACTION[obj.action]}用户 {obj.plate_number} 的月卡信息")
cm = CallMessage("update_fixed_card", obj.__dict__)
self.userdata.set_message(cm.dict())
# 等待回传
self.userdata.set_token(cm.messageId)
self.userdata.set_status_add("status", False)
self.userdata.set_status_add("start_timestamp", time.time())
topic_resp = f"/jmlink/{device_id}/tml/service/call_resp"
self.client.subscribe(topic_resp)
self.client.publish(topic, cm())
try:
while True:
if self.userdata.status["status"]:
if "code" in self.userdata.status["response"].keys():
if self.userdata.status["response"]["code"] != 0:
error_code = self.userdata.status['response']['code']
logger.Logger.error(f"{topic_resp} 返回错误码: {error_code}")
return False, error_code, self.userdata.status['response']['message']
else:
logger.Logger.info(f"车辆({obj.plate_number}) 月卡信息完成修改")
return True, 0, ""
if time.time() - self.userdata.status["start_timestamp"] > TIMEOUT_SECOND: # 超时跳出
logger.Logger.error("等待回复超时")
return False, -1, "等待回复超时"
time.sleep(SLEEP_TIME)
finally:
self.client.unsubscribe(topic_resp)
logger.Logger.debug(f"移除订阅: {topic_resp}")
self.userdata.set_token(None)
self.userdata.set_status_remove("response")
logger.Logger.debug(f"{'=' * 10} 🏁 UpdateFixedCard 🏁 {'=' * 10}")
if __name__ == '__main__':
sc = ServicesCall()
sc.del_face(2, DelFaceItem(
user_id="1234",
device_ids=str([2])
))
sc.add_face(2, AddFaceItem(
name="测试",
user_id="ceshi",
face_url="http://ip:port/api/v1/faces/person_demo.jpg",
device_ids=str([2, 321]),
start_date="2024-01-01T18:00:00.000Z"
))
sc.vehicle_registration(2, VehicleRegistrationItem(
car_plate_no="鲁G95339",
registration_id="5a12265cf73f261148770c12328bb195",
owner_id="1244668310399733760",
owner_name="测试业主",
registration_time="2024-05-27T16:12:26.895Z",
begin_time="2024-05-27T16:12:26.895Z",
end_time="2034-05-27T16:12:26.895Z",
action="insert",
registration_type='0'
))
sc.update_fixed_card(2, UpdateFixedCardItem(
vehicle_owner_id="1244668310399733760",
vehicle_owner_name="测试业主",
vehicle_owner_phone="1244668310399733760",
action="insert",
card_id=str(int(time.time() * 1000)),
effective_date_begin=now_tz_datetime(),
effective_date_end=now_tz_datetime(365 * 10),
plate_number="鲁G95339"
))