194 lines
8.5 KiB
Python
194 lines
8.5 KiB
Python
# -*- coding:utf-8 -*-
|
|
"""
|
|
@Author : xuxingchen
|
|
@Contact : xuxingchen@sinochem.com
|
|
@Desc : 通行授权界面控制逻辑
|
|
"""
|
|
import os
|
|
import time
|
|
|
|
from fastapi import APIRouter, Request, Header, File, UploadFile, Query
|
|
|
|
from config import PREFIX_PATH, SUB_PATH
|
|
from config import APP_HOST as HOST_IP
|
|
from config import IMAGE_SERVER_PORT as PORT
|
|
from models.devices import SearchDevicesInfo, DevicesTable
|
|
from models.householders import HouseholdersTable, GetAuthHouseholdersInfo, GetAuthUsersInfo, \
|
|
UpdateHouseholderFace, AddAuthUserInfo, UpdateAuthUserInfo
|
|
from routers.login import authenticate_token
|
|
from utils import logger
|
|
from utils.logger import TOKEN_ERROR
|
|
from utils.database import get_table_handler
|
|
from utils.misc import InvalidException
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
@router.get("/getHouseholdersInfo", summary="授权住户查询")
|
|
async def get_householders_info(request: Request,
|
|
page: int = Query(..., gt=0),
|
|
limit: int = Query(..., gt=0),
|
|
token: str = Header(...)):
|
|
"""获取已授权的住户信息"""
|
|
if not authenticate_token(token):
|
|
raise InvalidException(TOKEN_ERROR)
|
|
th = get_table_handler()
|
|
resp = HouseholdersTable.get_auth_householder_info(th, None, None, None, None,
|
|
page, limit)
|
|
logger.Logger.debug(f"{request.url.path} {resp}")
|
|
return resp
|
|
|
|
|
|
@router.post("/getHouseholdersInfo", summary="授权住户条件查询")
|
|
async def get_householders_info_by_condition(request: Request,
|
|
item: GetAuthHouseholdersInfo,
|
|
token: str = Header(...)):
|
|
"""根据条件获取已授权的住户信息查询"""
|
|
if not authenticate_token(token):
|
|
raise InvalidException(TOKEN_ERROR)
|
|
th = get_table_handler()
|
|
logger.Logger.debug(f"{request.url.path} <- {item.__dict__}")
|
|
resp = HouseholdersTable.get_auth_householder_info(th, item.search_type, item.search_key,
|
|
item.space_type, item.space_id,
|
|
item.page, item.limit)
|
|
logger.Logger.debug(f"{request.url.path} {resp}")
|
|
return resp
|
|
|
|
|
|
@router.get("/getHouseholderDetailInfo", summary="住户授权详细信息查询")
|
|
async def get_householder_detail_info(request: Request, householder_id: int = Query(...), token: str = Header(...)):
|
|
"""获取指定住户授权详细信息查询"""
|
|
if not authenticate_token(token):
|
|
raise InvalidException(TOKEN_ERROR)
|
|
th = get_table_handler()
|
|
resp = HouseholdersTable.get_auth_householder_detail_info(th, householder_id)
|
|
logger.Logger.debug(f"{request.url.path} {resp}")
|
|
return resp
|
|
|
|
|
|
@router.get("/getUsersInfo", summary="通用授权人员信息查询")
|
|
async def get_users_info(request: Request,
|
|
page: int = Query(..., gt=0),
|
|
limit: int = Query(..., gt=0),
|
|
token: str = Header(...)):
|
|
"""获取已授权的通用人员信息"""
|
|
if not authenticate_token(token):
|
|
raise InvalidException(TOKEN_ERROR)
|
|
th = get_table_handler()
|
|
resp = HouseholdersTable.get_auth_users_info(th, None, None, None, page, limit)
|
|
logger.Logger.debug(f"{request.url.path} {resp}")
|
|
return resp
|
|
|
|
|
|
@router.post("/getUsersInfo", summary="通用授权人员信息条件查询")
|
|
async def get_users_info_by_condition(request: Request,
|
|
item: GetAuthUsersInfo,
|
|
token: str = Header(...)):
|
|
"""根据条件获取已授权的通用人员信息"""
|
|
if not authenticate_token(token):
|
|
raise InvalidException(TOKEN_ERROR)
|
|
th = get_table_handler()
|
|
resp = HouseholdersTable.get_auth_users_info(th, item.search_type, item.search_key,
|
|
item.user_type, item.page, item.limit)
|
|
logger.Logger.debug(f"{request.url.path} {resp}")
|
|
return resp
|
|
|
|
|
|
@router.get("/getUserDetailInfo", summary="通用授权人员详细信息查询")
|
|
async def get_user_detail_info(request: Request,
|
|
user_id: int = Query(alias="id"),
|
|
token: str = Header(...)):
|
|
"""获取指定通用人员的详细信息"""
|
|
if not authenticate_token(token):
|
|
raise InvalidException(TOKEN_ERROR)
|
|
th = get_table_handler()
|
|
resp = HouseholdersTable.get_auth_user_info(th, user_id)
|
|
logger.Logger.debug(f"{request.url.path} {resp}")
|
|
return resp
|
|
|
|
|
|
@router.delete("/deleteUserInfo", summary="删除通用授权人员信息及关联设备授权")
|
|
async def get_users_info(request: Request, user_id: int = Query(...), token: str = Header(...)):
|
|
"""通用授权人员删除 & 移除关联设备授权"""
|
|
if not authenticate_token(token):
|
|
raise InvalidException(TOKEN_ERROR)
|
|
th = get_table_handler()
|
|
resp = HouseholdersTable.delete_auth_user_info(th, user_id)
|
|
logger.Logger.debug(f"{request.url.path} {resp}")
|
|
return resp
|
|
|
|
|
|
@router.post("/addUserInfo", summary="新增通用授权人员信息及关联设备授权")
|
|
async def add_user_info(request: Request, item: AddAuthUserInfo, token: str = Header(...)):
|
|
"""新增通用授权人员 & 人脸图像绑定 & 添加关联设备授权"""
|
|
if not authenticate_token(token):
|
|
raise InvalidException(TOKEN_ERROR)
|
|
th = get_table_handler()
|
|
resp = HouseholdersTable.add_auth_user_info(th, item)
|
|
logger.Logger.debug(f"{request.url.path} {resp}")
|
|
return resp
|
|
|
|
|
|
@router.post("/updateUserInfo", summary="更新通用授权人员信息及关联设备授权")
|
|
async def update_user_info(request: Request, item: UpdateAuthUserInfo, token: str = Header(...)):
|
|
"""更新通用授权人员信息 & 人脸图像绑定 & 判断是否需要刷新关联设备授权"""
|
|
if not authenticate_token(token):
|
|
raise InvalidException(TOKEN_ERROR)
|
|
th = get_table_handler()
|
|
resp = HouseholdersTable.update_auth_user_info(th, item)
|
|
logger.Logger.debug(f"{request.url.path} {resp}")
|
|
return resp
|
|
|
|
|
|
@router.post("/uploadHouseholderFace", summary="人脸图片上传")
|
|
async def upload_householder_face(request: Request,
|
|
face_file: UploadFile = File(...),
|
|
token: str = Header(...)):
|
|
"""接收人脸图片上传并保存"""
|
|
if not authenticate_token(token):
|
|
raise InvalidException(TOKEN_ERROR)
|
|
# 检查文件类型是否为图像
|
|
if not face_file.content_type.startswith("image"):
|
|
resp = {"status": False, "message": "仅支持上传图片"}
|
|
else:
|
|
file_content = await face_file.read()
|
|
max_size_mb = 10 # 设置最大允许的文件大小为 10MB
|
|
if face_file.file.seek(0, os.SEEK_END) > max_size_mb * 1024 * 1024:
|
|
resp = {"status": False, "message": f"文件大小不得超过 {max_size_mb}MB"}
|
|
else:
|
|
save_path = f"./data/FaceImages"
|
|
filename = f"_{int(time.time() * 1000)}"
|
|
if not os.path.exists(save_path):
|
|
os.mkdir(save_path)
|
|
file_path = f"{save_path}/{filename}.jpg"
|
|
with open(file_path, "wb") as f:
|
|
f.write(file_content)
|
|
|
|
# 返回成功消息及人脸照片的 URL
|
|
face_url = f"http://{HOST_IP}:{PORT}{PREFIX_PATH}/{SUB_PATH}/{filename}.jpg"
|
|
resp = {"face_url": face_url}
|
|
logger.Logger.debug(f"{request.url.path} {resp}")
|
|
return resp
|
|
|
|
|
|
@router.post("/updateHouseholderFace", summary="更新住户人脸信息")
|
|
async def get_users_info(request: Request, item: UpdateHouseholderFace, token: str = Header(...)):
|
|
"""人脸图像与住户ID绑定 & 刷新关联设备授权"""
|
|
if not authenticate_token(token):
|
|
raise InvalidException(TOKEN_ERROR)
|
|
th = get_table_handler()
|
|
resp = HouseholdersTable.update_householder_face(th, item.id, item.face_url)
|
|
logger.Logger.debug(f"{request.url.path} {resp}")
|
|
return resp
|
|
|
|
|
|
@router.post("/getAssociatedAccessDevicesInfo", summary="通用授权-门禁设备条件查询")
|
|
async def get_associated_access_devices_info(request: Request, item: SearchDevicesInfo, token: str = Header(...)):
|
|
"""已具备关联空间信息的门禁设备条件查询"""
|
|
if not authenticate_token(token):
|
|
raise InvalidException(TOKEN_ERROR)
|
|
th = get_table_handler()
|
|
resp = DevicesTable.get_associated_access_devices_info(th, item.search_type, item.search_key)
|
|
logger.Logger.debug(f"{request.url.path} {resp}")
|
|
return resp
|