history-project/SCLP/routers/parkinglot_1.py
2024-10-24 09:23:39 +08:00

109 lines
4.0 KiB
Python

# -*- coding:utf-8 -*-
"""
@Author : xuxingchen
@Contact : xuxingchen@sinochem.com
@Desc : 车辆授权界面控制逻辑
"""
from fastapi import APIRouter, Query, Request, Header
from models.parkinglots import ParkinglotsTable, GetCarsInfo, AddCarsCard, UpdateCarsCard
from routers.login import authenticate_token
from utils import logger
from utils.logger import TOKEN_ERROR
from utils.database import get_table_handler
from utils.misc import InvalidException
router = APIRouter()
@router.get("/getCarsInfo", summary="授权车辆信息查询")
async def get_cars_info(request: Request,
page: int = Query(None),
limit: int = Query(None),
token: str = Header(...)):
"""授权车辆信息全量查询"""
if not authenticate_token(token):
raise InvalidException(TOKEN_ERROR)
th = get_table_handler()
resp = ParkinglotsTable.get_cars_info(th, None, None, page, limit)
logger.Logger.debug(f"{request.url.path} {resp}")
return resp
@router.post("/getCarsInfo", summary="授权车辆信息条件查询")
async def get_cars_info(request: Request,
item: GetCarsInfo,
token: str = Header(...)):
"""授权车辆信息全量查询"""
if not authenticate_token(token):
raise InvalidException(TOKEN_ERROR)
th = get_table_handler()
resp = ParkinglotsTable.get_cars_info(th, item.search_type, item.search_key, item.page, item.limit)
logger.Logger.debug(f"{request.url.path} {resp}")
return resp
@router.get("/getCarDetailInfo", summary="详细授权信息查询")
async def get_car_detail_info(request: Request,
auth_id: int = Query(...),
token: str = Header(...)):
"""授权车辆信息全量查询"""
if not authenticate_token(token):
raise InvalidException(TOKEN_ERROR)
th = get_table_handler()
resp = ParkinglotsTable.get_auth_info(th, auth_id)
logger.Logger.debug(f"{request.url.path} {resp}")
return resp
@router.get("/getCarIdAuthStatus", summary="获取车牌号授权状态")
async def get_car_id_auth_status(request: Request,
car_id: str = Query(...),
token: str = Header(...)):
"""获取车牌号授权状态"""
if not authenticate_token(token):
raise InvalidException(TOKEN_ERROR)
th = get_table_handler()
resp = ParkinglotsTable.get_car_id_auth_status(th, car_id)
logger.Logger.debug(f"{request.url.path} {resp}")
return resp
@router.delete("/deleteCarAuthId", summary="删除车辆授权")
async def delete_car_auth_id(request: Request,
auth_id: int = Query(...),
token: str = Header(...)):
"""删除车辆月卡授权"""
if not authenticate_token(token):
raise InvalidException(TOKEN_ERROR)
th = get_table_handler()
resp = ParkinglotsTable.delete_car_auth_id(th, auth_id)
logger.Logger.debug(f"{request.url.path} {resp}")
return resp
@router.post("/addCarAuthInfo", summary="新增车辆授权")
async def add_car_card_auth(request: Request,
item: AddCarsCard,
token: str = Header(...)):
"""新增车辆月卡授权"""
if not authenticate_token(token):
raise InvalidException(TOKEN_ERROR)
th = get_table_handler()
resp = ParkinglotsTable.add_car_card_auth(th, item)
logger.Logger.debug(f"{request.url.path} {resp}")
return resp
@router.post("/updateCarInfo", summary="更新车辆授权")
async def add_car_card_auth(request: Request,
item: UpdateCarsCard,
token: str = Header(...)):
"""更新车辆月卡授权"""
if not authenticate_token(token):
raise InvalidException(TOKEN_ERROR)
th = get_table_handler()
resp = ParkinglotsTable.update_car_card_auth(th, item)
logger.Logger.debug(f"{request.url.path} {resp}")
return resp