78 lines
3.4 KiB
Python
78 lines
3.4 KiB
Python
# -*- coding:utf-8 -*-
|
|
"""
|
|
@Author : xuxingchen
|
|
@Contact : xuxingchen@sinochem.com
|
|
@Desc : 通行设备界面控制逻辑
|
|
"""
|
|
from fastapi import APIRouter, Request, Header, Query
|
|
|
|
from models.devices import DevicesTable, AccessDevicesScope, BuildingDevicesScope
|
|
from routers.login import authenticate_token
|
|
from utils import logger
|
|
from utils.logger import TOKEN_ERROR
|
|
from utils.database import get_table_handler
|
|
from utils.misc import InvalidException
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
@router.get("/getAccessDevicesInfo", summary="获取通行设备信息")
|
|
async def get_access_devices_info(request: Request,
|
|
is_associated: bool = Query(...),
|
|
device_name: str = Query(None),
|
|
product_name: str = Query(None),
|
|
device_mac: str = Query(None),
|
|
device_id: int = Query(None),
|
|
token: str = Header(...)):
|
|
"""获取通行设备信息"""
|
|
if not authenticate_token(token):
|
|
raise InvalidException(TOKEN_ERROR)
|
|
th = get_table_handler()
|
|
if device_name:
|
|
resp = DevicesTable.get_access_devices_info(th, is_associated, device_name=device_name)
|
|
elif product_name:
|
|
resp = DevicesTable.get_access_devices_info(th, is_associated, product_name=product_name)
|
|
elif device_mac:
|
|
resp = DevicesTable.get_access_devices_info(th, is_associated, device_mac=device_mac)
|
|
elif device_id:
|
|
resp = DevicesTable.get_access_devices_info(th, is_associated, device_id=device_id)
|
|
else:
|
|
resp = DevicesTable.get_access_devices_info(th, is_associated)
|
|
logger.Logger.debug(f"{request.url.path} {resp}")
|
|
return resp
|
|
|
|
|
|
@router.post("/addAccessDevicesAssociateInfo", summary="批量增加大门门禁/大门闸机关联信息")
|
|
async def add_access_devices_associate_info(request: Request, item: AccessDevicesScope, token: str = Header(...)):
|
|
"""批量增加大门门禁/大门闸机关联信息"""
|
|
if not authenticate_token(token):
|
|
raise InvalidException(TOKEN_ERROR)
|
|
th = get_table_handler()
|
|
_callback = DevicesTable.add_access_devices(th, item.device_type, item.info)
|
|
resp = {"status": _callback}
|
|
logger.Logger.debug(f"{request.url.path} {resp}")
|
|
return resp
|
|
|
|
|
|
@router.post("/addBuildingDevicesAssociateInfo", summary="批量增加楼栋门禁关联信息")
|
|
async def add_building_devices_associate_info(request: Request, item: BuildingDevicesScope, token: str = Header(...)):
|
|
"""批量增加楼栋门禁关联信息"""
|
|
if not authenticate_token(token):
|
|
raise InvalidException(TOKEN_ERROR)
|
|
th = get_table_handler()
|
|
_callback = DevicesTable.add_building_devices(th, "楼栋门禁", item.info)
|
|
resp = {"status": _callback}
|
|
logger.Logger.debug(f"{request.url.path} {resp}")
|
|
return resp
|
|
|
|
|
|
@router.delete("/deleteAssociatedAccessDeviceInfo", summary="移除完成关联的设备及其已授权的人员信息")
|
|
async def delete_access_device_info(request: Request, device_id: int = Query(alias="id"), token: str = Header(...)):
|
|
"""移除完成关联的设备及其已授权的人员信息"""
|
|
if not authenticate_token(token):
|
|
raise InvalidException(TOKEN_ERROR)
|
|
th = get_table_handler()
|
|
resp = DevicesTable.delete_access_device_info(th, device_id)
|
|
logger.Logger.debug(f"{request.url.path} {resp}")
|
|
return resp
|