history-project/meian/devices/yunfu_model.py
2024-10-24 09:04:38 +08:00

138 lines
3.2 KiB
Python

# -*- coding:utf-8 -*-
"""
@File : yunfu_model
@Author : xuxingchen
@Version : 1.0.1
@Contact : xuxingchen@sinochem.com
@Desc : 云服的数据模型
"""
import os
import time
from typing import Optional
from pydantic import BaseModel, field_validator, Field
from enum import Enum
from utils import get_sign
error_code = {
0: "成功",
401: "参数错误,原因可能是:必填参数缺失或格式错误",
402: "云端错误。请稍后再试",
403: "mq解析失败",
404: "发送kafka异常",
602: "通过deviceUuid获取redis中DeviceInfoCacheVO缓存失败",
40014: "设备不存在"
}
class MeianProduct:
if int(os.environ.get("ENV_TYPE", 0)) != 2:
id: str = "4bb9a56bfe9*********2ee99bac6d"
secret: str = "9b0159********abe57c10ea665ddd"
else:
id: str = "a2ce31a40b********230ccc860e"
secret: str = "03970a**********28b84e8437"
class OpenEventType(Enum):
# 此处的取名应当与美安设备的accessMode能够一一对应
FACE = 8
QRCODE = 10
OFFLINE = 11
class OpenEventCertificateType(Enum):
# 此处的取名应当与美安设备的accessMode能够一一对应
FACE = 2
PASSWORD = 3
QRCODE = 4
class BaseInfo(BaseModel):
def check(self):
for attr in self.__dict__.keys():
# if property can be null, default value should not be set to None
if self.__dict__[attr] is None:
raise ValueError(f"{attr} not allowed to be set to None")
class CommRegister(BaseInfo):
messageId: str = None
version: str = "1.0"
time: int = int(time.time() * 1000)
params: dict = None
class EventPost(BaseInfo):
messageId: str = None
version: str = "1.0"
time: int = int(time.time() * 1000)
ext: dict = {
"ack": 1
}
eventCode: str = None
params: dict = None
class EventPostResp(BaseInfo):
messageId: str
requestTime: int
time: int = int(time.time() * 1000)
code: int = 0
message: str = "success"
serviceCode: str
data: dict = None
class OpenEvent(BaseInfo):
type: OpenEventType | int = None
user_picture: str = "default"
code: str = "default"
name: str = None
certificate_type: OpenEventCertificateType | int = None
certificate: str = None
result: int = 1
error_code: int = 1
enter_type: int = 1
datetime: str = None
class RegisterParam(BaseInfo):
productId: str = MeianProduct.id
deviceName: str = None
displayName: str = None
sign: Optional[str] = Field(default=None, validate_default=True)
@field_validator("sign")
def gen_sign(cls, value, values):
productId = values.data.get("productId")
deviceName = values.data.get("deviceName")
return get_sign(f"deviceName{deviceName}productId{productId}", MeianProduct.secret)
class Online(BaseInfo):
messageId: str
version: str = "1.0"
time: int = int(time.time() * 1000)
cleanSession: str = "true"
productModel: str = "default"
chipModel: str = "default"
otaVersion: str = ""
class QrCodeResp(BaseInfo):
code: int
message: str
qrcode: str
class AddFaceResp(BaseInfo):
code: int
message: str
class DelFaceResp(BaseInfo):
code: int
message: str