# -*- coding:utf-8 -*- """ @File : yunfu_model @Author : xuxingchen @Version : 1.0.1 @Contact : xuxingchen@sinochem.com @Desc : 云服的数据模型 """ import os import time from typing import Optional from pydantic import BaseModel, field_validator, Field from enum import Enum from utils import get_sign error_code = { 0: "成功", 401: "参数错误,原因可能是:必填参数缺失或格式错误", 402: "云端错误。请稍后再试", 403: "mq解析失败", 404: "发送kafka异常", 602: "通过deviceUuid获取redis中DeviceInfoCacheVO缓存失败", 40014: "设备不存在" } class MeianProduct: if int(os.environ.get("ENV_TYPE", 0)) != 2: id: str = "4bb9a56bfe9*********2ee99bac6d" secret: str = "9b0159********abe57c10ea665ddd" else: id: str = "a2ce31a40b********230ccc860e" secret: str = "03970a**********28b84e8437" class OpenEventType(Enum): # 此处的取名应当与美安设备的accessMode能够一一对应 FACE = 8 QRCODE = 10 OFFLINE = 11 class OpenEventCertificateType(Enum): # 此处的取名应当与美安设备的accessMode能够一一对应 FACE = 2 PASSWORD = 3 QRCODE = 4 class BaseInfo(BaseModel): def check(self): for attr in self.__dict__.keys(): # if property can be null, default value should not be set to None if self.__dict__[attr] is None: raise ValueError(f"{attr} not allowed to be set to None") class CommRegister(BaseInfo): messageId: str = None version: str = "1.0" time: int = int(time.time() * 1000) params: dict = None class EventPost(BaseInfo): messageId: str = None version: str = "1.0" time: int = int(time.time() * 1000) ext: dict = { "ack": 1 } eventCode: str = None params: dict = None class EventPostResp(BaseInfo): messageId: str requestTime: int time: int = int(time.time() * 1000) code: int = 0 message: str = "success" serviceCode: str data: dict = None class OpenEvent(BaseInfo): type: OpenEventType | int = None user_picture: str = "default" code: str = "default" name: str = None certificate_type: OpenEventCertificateType | int = None certificate: str = None result: int = 1 error_code: int = 1 enter_type: int = 1 datetime: str = None class RegisterParam(BaseInfo): productId: str = MeianProduct.id deviceName: str = None displayName: str = None sign: Optional[str] = Field(default=None, validate_default=True) @field_validator("sign") def gen_sign(cls, value, values): productId = values.data.get("productId") deviceName = values.data.get("deviceName") return get_sign(f"deviceName{deviceName}productId{productId}", MeianProduct.secret) class Online(BaseInfo): messageId: str version: str = "1.0" time: int = int(time.time() * 1000) cleanSession: str = "true" productModel: str = "default" chipModel: str = "default" otaVersion: str = "" class QrCodeResp(BaseInfo): code: int message: str qrcode: str class AddFaceResp(BaseInfo): code: int message: str class DelFaceResp(BaseInfo): code: int message: str