history-project/SCLP/routers/access_devices_0.py
2024-10-24 09:23:39 +08:00

78 lines
3.4 KiB
Python

# -*- coding:utf-8 -*-
"""
@Author : xuxingchen
@Contact : xuxingchen@sinochem.com
@Desc : 通行设备界面控制逻辑
"""
from fastapi import APIRouter, Request, Header, Query
from models.devices import DevicesTable, AccessDevicesScope, BuildingDevicesScope
from routers.login import authenticate_token
from utils import logger
from utils.logger import TOKEN_ERROR
from utils.database import get_table_handler
from utils.misc import InvalidException
router = APIRouter()
@router.get("/getAccessDevicesInfo", summary="获取通行设备信息")
async def get_access_devices_info(request: Request,
is_associated: bool = Query(...),
device_name: str = Query(None),
product_name: str = Query(None),
device_mac: str = Query(None),
device_id: int = Query(None),
token: str = Header(...)):
"""获取通行设备信息"""
if not authenticate_token(token):
raise InvalidException(TOKEN_ERROR)
th = get_table_handler()
if device_name:
resp = DevicesTable.get_access_devices_info(th, is_associated, device_name=device_name)
elif product_name:
resp = DevicesTable.get_access_devices_info(th, is_associated, product_name=product_name)
elif device_mac:
resp = DevicesTable.get_access_devices_info(th, is_associated, device_mac=device_mac)
elif device_id:
resp = DevicesTable.get_access_devices_info(th, is_associated, device_id=device_id)
else:
resp = DevicesTable.get_access_devices_info(th, is_associated)
logger.Logger.debug(f"{request.url.path} {resp}")
return resp
@router.post("/addAccessDevicesAssociateInfo", summary="批量增加大门门禁/大门闸机关联信息")
async def add_access_devices_associate_info(request: Request, item: AccessDevicesScope, token: str = Header(...)):
"""批量增加大门门禁/大门闸机关联信息"""
if not authenticate_token(token):
raise InvalidException(TOKEN_ERROR)
th = get_table_handler()
_callback = DevicesTable.add_access_devices(th, item.device_type, item.info)
resp = {"status": _callback}
logger.Logger.debug(f"{request.url.path} {resp}")
return resp
@router.post("/addBuildingDevicesAssociateInfo", summary="批量增加楼栋门禁关联信息")
async def add_building_devices_associate_info(request: Request, item: BuildingDevicesScope, token: str = Header(...)):
"""批量增加楼栋门禁关联信息"""
if not authenticate_token(token):
raise InvalidException(TOKEN_ERROR)
th = get_table_handler()
_callback = DevicesTable.add_building_devices(th, "楼栋门禁", item.info)
resp = {"status": _callback}
logger.Logger.debug(f"{request.url.path} {resp}")
return resp
@router.delete("/deleteAssociatedAccessDeviceInfo", summary="移除完成关联的设备及其已授权的人员信息")
async def delete_access_device_info(request: Request, device_id: int = Query(alias="id"), token: str = Header(...)):
"""移除完成关联的设备及其已授权的人员信息"""
if not authenticate_token(token):
raise InvalidException(TOKEN_ERROR)
th = get_table_handler()
resp = DevicesTable.delete_access_device_info(th, device_id)
logger.Logger.debug(f"{request.url.path} {resp}")
return resp