86 lines
3.3 KiB
Python
86 lines
3.3 KiB
Python
# -*- coding:utf-8 -*-
|
|
"""
|
|
@Author : xuxingchen
|
|
@Contact : xuxingchen@sinochem.com
|
|
@Desc : 车场管理界面控制逻辑
|
|
"""
|
|
from fastapi import APIRouter, Request, Header, Query
|
|
|
|
from models.parkinglots import ParkinglotsTable, AddParkingLot
|
|
from routers.login import authenticate_token
|
|
from utils import logger
|
|
from utils.logger import TOKEN_ERROR
|
|
from utils.database import get_table_handler
|
|
from utils.misc import InvalidException
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
@router.get("/getParkingLotsInfo", summary="车场信息查询")
|
|
async def get_parkinglots_info(request: Request, token: str = Header(...)):
|
|
"""获取车场信息"""
|
|
if not authenticate_token(token):
|
|
raise InvalidException(TOKEN_ERROR)
|
|
th = get_table_handler()
|
|
resp = ParkinglotsTable.get_parkinglots_info(th)
|
|
logger.Logger.debug(f"{request.url.path} {resp}")
|
|
return resp
|
|
|
|
|
|
@router.post("/addParkingLot", summary="添加车场")
|
|
async def add_parkinglot(request: Request, item: AddParkingLot, token: str = Header(...)):
|
|
"""添加显示信息的车场 & 更新车场编号对应的名字和供应商"""
|
|
if not authenticate_token(token):
|
|
raise InvalidException(TOKEN_ERROR)
|
|
th = get_table_handler()
|
|
factory_name = item.factory_name if item.factory_name else ''
|
|
resp = ParkinglotsTable.update_parkinglot_show(th, item.parkinglot_number, item.parkinglot_name, factory_name, True)
|
|
logger.Logger.debug(f"{request.url.path} {resp}")
|
|
return resp
|
|
|
|
|
|
@router.delete("/deleteParkingLot", summary="移除车场")
|
|
async def delete_parkinglot(request: Request,
|
|
parkinglot_number: str = Query(alias="number", description="车场编号"),
|
|
token: str = Header(...)):
|
|
"""移除显示信息的车场"""
|
|
if not authenticate_token(token):
|
|
raise InvalidException(TOKEN_ERROR)
|
|
th = get_table_handler()
|
|
resp = ParkinglotsTable.update_parkinglot_show(th, parkinglot_number, None, None, False)
|
|
logger.Logger.debug(f"{request.url.path} {resp}")
|
|
return resp
|
|
|
|
|
|
@router.get("/getParkingLotsAreaInfo", summary="区域信息查询")
|
|
async def get_parkinglots_area_info(request: Request, token: str = Header(...)):
|
|
"""区域信息查询"""
|
|
if not authenticate_token(token):
|
|
raise InvalidException(TOKEN_ERROR)
|
|
th = get_table_handler()
|
|
resp = ParkinglotsTable.get_showed_parkinglot_area_info(th)
|
|
logger.Logger.debug(f"{request.url.path} {resp}")
|
|
return resp
|
|
|
|
|
|
@router.get("/getParkingLotsChannelInfo", summary="通道信息查询")
|
|
async def get_parkinglots_channel_info(request: Request, token: str = Header(...)):
|
|
"""通道信息查询"""
|
|
if not authenticate_token(token):
|
|
raise InvalidException(TOKEN_ERROR)
|
|
th = get_table_handler()
|
|
resp = ParkinglotsTable.get_showed_parkinglot_channel_info(th)
|
|
logger.Logger.debug(f"{request.url.path} {resp}")
|
|
return resp
|
|
|
|
|
|
@router.get("/getParkingLotsGateInfo", summary="道闸信息查询")
|
|
async def get_parkinglots_channel_info(request: Request, token: str = Header(...)):
|
|
"""道闸信息查询"""
|
|
if not authenticate_token(token):
|
|
raise InvalidException(TOKEN_ERROR)
|
|
th = get_table_handler()
|
|
resp = ParkinglotsTable.get_showed_parkinglot_gate_info(th)
|
|
logger.Logger.debug(f"{request.url.path} {resp}")
|
|
return resp
|