# -*- coding:utf-8 -*- """ @Author : xuxingchen @Contact : xuxingchen@sinochem.com @Desc : 通行设备界面控制逻辑 """ from fastapi import APIRouter, Request, Header, Query from models.devices import DevicesTable, AccessDevicesScope, BuildingDevicesScope from routers.login import authenticate_token from utils import logger from utils.logger import TOKEN_ERROR from utils.database import get_table_handler from utils.misc import InvalidException router = APIRouter() @router.get("/getAccessDevicesInfo", summary="获取通行设备信息") async def get_access_devices_info(request: Request, is_associated: bool = Query(...), device_name: str = Query(None), product_name: str = Query(None), device_mac: str = Query(None), device_id: int = Query(None), token: str = Header(...)): """获取通行设备信息""" if not authenticate_token(token): raise InvalidException(TOKEN_ERROR) th = get_table_handler() if device_name: resp = DevicesTable.get_access_devices_info(th, is_associated, device_name=device_name) elif product_name: resp = DevicesTable.get_access_devices_info(th, is_associated, product_name=product_name) elif device_mac: resp = DevicesTable.get_access_devices_info(th, is_associated, device_mac=device_mac) elif device_id: resp = DevicesTable.get_access_devices_info(th, is_associated, device_id=device_id) else: resp = DevicesTable.get_access_devices_info(th, is_associated) logger.Logger.debug(f"{request.url.path} {resp}") return resp @router.post("/addAccessDevicesAssociateInfo", summary="批量增加大门门禁/大门闸机关联信息") async def add_access_devices_associate_info(request: Request, item: AccessDevicesScope, token: str = Header(...)): """批量增加大门门禁/大门闸机关联信息""" if not authenticate_token(token): raise InvalidException(TOKEN_ERROR) th = get_table_handler() _callback = DevicesTable.add_access_devices(th, item.device_type, item.info) resp = {"status": _callback} logger.Logger.debug(f"{request.url.path} {resp}") return resp @router.post("/addBuildingDevicesAssociateInfo", summary="批量增加楼栋门禁关联信息") async def add_building_devices_associate_info(request: Request, item: BuildingDevicesScope, token: str = Header(...)): """批量增加楼栋门禁关联信息""" if not authenticate_token(token): raise InvalidException(TOKEN_ERROR) th = get_table_handler() _callback = DevicesTable.add_building_devices(th, "楼栋门禁", item.info) resp = {"status": _callback} logger.Logger.debug(f"{request.url.path} {resp}") return resp @router.delete("/deleteAssociatedAccessDeviceInfo", summary="移除完成关联的设备及其已授权的人员信息") async def delete_access_device_info(request: Request, device_id: int = Query(alias="id"), token: str = Header(...)): """移除完成关联的设备及其已授权的人员信息""" if not authenticate_token(token): raise InvalidException(TOKEN_ERROR) th = get_table_handler() resp = DevicesTable.delete_access_device_info(th, device_id) logger.Logger.debug(f"{request.url.path} {resp}") return resp