# -*- coding:utf-8 -*- """ @Author : xuxingchen @Contact : xuxingchen@sinochem.com @Desc : 通行授权界面控制逻辑 """ import os import time from fastapi import APIRouter, Request, Header, File, UploadFile, Query from config import PREFIX_PATH, SUB_PATH from config import APP_HOST as HOST_IP from config import IMAGE_SERVER_PORT as PORT from models.devices import SearchDevicesInfo, DevicesTable from models.householders import HouseholdersTable, GetAuthHouseholdersInfo, GetAuthUsersInfo, \ UpdateHouseholderFace, AddAuthUserInfo, UpdateAuthUserInfo from routers.login import authenticate_token from utils import logger from utils.logger import TOKEN_ERROR from utils.database import get_table_handler from utils.misc import InvalidException router = APIRouter() @router.get("/getHouseholdersInfo", summary="授权住户查询") async def get_householders_info(request: Request, page: int = Query(..., gt=0), limit: int = Query(..., gt=0), token: str = Header(...)): """获取已授权的住户信息""" if not authenticate_token(token): raise InvalidException(TOKEN_ERROR) th = get_table_handler() resp = HouseholdersTable.get_auth_householder_info(th, None, None, None, None, page, limit) logger.Logger.debug(f"{request.url.path} {resp}") return resp @router.post("/getHouseholdersInfo", summary="授权住户条件查询") async def get_householders_info_by_condition(request: Request, item: GetAuthHouseholdersInfo, token: str = Header(...)): """根据条件获取已授权的住户信息查询""" if not authenticate_token(token): raise InvalidException(TOKEN_ERROR) th = get_table_handler() logger.Logger.debug(f"{request.url.path} <- {item.__dict__}") resp = HouseholdersTable.get_auth_householder_info(th, item.search_type, item.search_key, item.space_type, item.space_id, item.page, item.limit) logger.Logger.debug(f"{request.url.path} {resp}") return resp @router.get("/getHouseholderDetailInfo", summary="住户授权详细信息查询") async def get_householder_detail_info(request: Request, householder_id: int = Query(...), token: str = Header(...)): """获取指定住户授权详细信息查询""" if not authenticate_token(token): raise InvalidException(TOKEN_ERROR) th = get_table_handler() resp = HouseholdersTable.get_auth_householder_detail_info(th, householder_id) logger.Logger.debug(f"{request.url.path} {resp}") return resp @router.get("/getUsersInfo", summary="通用授权人员信息查询") async def get_users_info(request: Request, page: int = Query(..., gt=0), limit: int = Query(..., gt=0), token: str = Header(...)): """获取已授权的通用人员信息""" if not authenticate_token(token): raise InvalidException(TOKEN_ERROR) th = get_table_handler() resp = HouseholdersTable.get_auth_users_info(th, None, None, None, page, limit) logger.Logger.debug(f"{request.url.path} {resp}") return resp @router.post("/getUsersInfo", summary="通用授权人员信息条件查询") async def get_users_info_by_condition(request: Request, item: GetAuthUsersInfo, token: str = Header(...)): """根据条件获取已授权的通用人员信息""" if not authenticate_token(token): raise InvalidException(TOKEN_ERROR) th = get_table_handler() resp = HouseholdersTable.get_auth_users_info(th, item.search_type, item.search_key, item.user_type, item.page, item.limit) logger.Logger.debug(f"{request.url.path} {resp}") return resp @router.get("/getUserDetailInfo", summary="通用授权人员详细信息查询") async def get_user_detail_info(request: Request, user_id: int = Query(alias="id"), token: str = Header(...)): """获取指定通用人员的详细信息""" if not authenticate_token(token): raise InvalidException(TOKEN_ERROR) th = get_table_handler() resp = HouseholdersTable.get_auth_user_info(th, user_id) logger.Logger.debug(f"{request.url.path} {resp}") return resp @router.delete("/deleteUserInfo", summary="删除通用授权人员信息及关联设备授权") async def get_users_info(request: Request, user_id: int = Query(...), token: str = Header(...)): """通用授权人员删除 & 移除关联设备授权""" if not authenticate_token(token): raise InvalidException(TOKEN_ERROR) th = get_table_handler() resp = HouseholdersTable.delete_auth_user_info(th, user_id) logger.Logger.debug(f"{request.url.path} {resp}") return resp @router.post("/addUserInfo", summary="新增通用授权人员信息及关联设备授权") async def add_user_info(request: Request, item: AddAuthUserInfo, token: str = Header(...)): """新增通用授权人员 & 人脸图像绑定 & 添加关联设备授权""" if not authenticate_token(token): raise InvalidException(TOKEN_ERROR) th = get_table_handler() resp = HouseholdersTable.add_auth_user_info(th, item) logger.Logger.debug(f"{request.url.path} {resp}") return resp @router.post("/updateUserInfo", summary="更新通用授权人员信息及关联设备授权") async def update_user_info(request: Request, item: UpdateAuthUserInfo, token: str = Header(...)): """更新通用授权人员信息 & 人脸图像绑定 & 判断是否需要刷新关联设备授权""" if not authenticate_token(token): raise InvalidException(TOKEN_ERROR) th = get_table_handler() resp = HouseholdersTable.update_auth_user_info(th, item) logger.Logger.debug(f"{request.url.path} {resp}") return resp @router.post("/uploadHouseholderFace", summary="人脸图片上传") async def upload_householder_face(request: Request, face_file: UploadFile = File(...), token: str = Header(...)): """接收人脸图片上传并保存""" if not authenticate_token(token): raise InvalidException(TOKEN_ERROR) # 检查文件类型是否为图像 if not face_file.content_type.startswith("image"): resp = {"status": False, "message": "仅支持上传图片"} else: file_content = await face_file.read() max_size_mb = 10 # 设置最大允许的文件大小为 10MB if face_file.file.seek(0, os.SEEK_END) > max_size_mb * 1024 * 1024: resp = {"status": False, "message": f"文件大小不得超过 {max_size_mb}MB"} else: save_path = f"./data/FaceImages" filename = f"_{int(time.time() * 1000)}" if not os.path.exists(save_path): os.mkdir(save_path) file_path = f"{save_path}/{filename}.jpg" with open(file_path, "wb") as f: f.write(file_content) # 返回成功消息及人脸照片的 URL face_url = f"http://{HOST_IP}:{PORT}{PREFIX_PATH}/{SUB_PATH}/{filename}.jpg" resp = {"face_url": face_url} logger.Logger.debug(f"{request.url.path} {resp}") return resp @router.post("/updateHouseholderFace", summary="更新住户人脸信息") async def get_users_info(request: Request, item: UpdateHouseholderFace, token: str = Header(...)): """人脸图像与住户ID绑定 & 刷新关联设备授权""" if not authenticate_token(token): raise InvalidException(TOKEN_ERROR) th = get_table_handler() resp = HouseholdersTable.update_householder_face(th, item.id, item.face_url) logger.Logger.debug(f"{request.url.path} {resp}") return resp @router.post("/getAssociatedAccessDevicesInfo", summary="通用授权-门禁设备条件查询") async def get_associated_access_devices_info(request: Request, item: SearchDevicesInfo, token: str = Header(...)): """已具备关联空间信息的门禁设备条件查询""" if not authenticate_token(token): raise InvalidException(TOKEN_ERROR) th = get_table_handler() resp = DevicesTable.get_associated_access_devices_info(th, item.search_type, item.search_key) logger.Logger.debug(f"{request.url.path} {resp}") return resp