# -*- coding:utf-8 -*- """ @Author : xuxingchen @Contact : xuxingchen@sinochem.com @Desc : 住户管理界面控制逻辑 """ from fastapi import APIRouter, Request, Header, Query from models.householders import HouseholdersTable, GetHouseholdersInfo, AddHouseholderInfo, UpdateHouseholderInfo from models.houses import HousesTable from routers.login import authenticate_token from utils import logger from utils.logger import TOKEN_ERROR from utils.database import get_table_handler from utils.misc import InvalidException router = APIRouter() @router.post("/addHouseholderInfo", summary="新增住户信息") async def add_householder_info(request: Request, item: AddHouseholderInfo, token: str = Header(...)): """新增住户信息""" if not authenticate_token(token): raise InvalidException(TOKEN_ERROR) th = get_table_handler() # 进行住户信息插入 householder_id = HouseholdersTable.insert(th, item.name, item.sex, item.phone, "住户") if householder_id: for room_id, _type in item.property_info: # 房产关联表信息插入 HouseholdersTable.insert_type(th, room_id, householder_id, _type) # 更新房产中的住户数量 old_count = HousesTable.get_householder_count(th, room_id) HousesTable.update_householder_count(th, room_id, old_count + 1) resp = {"status": True} logger.Logger.debug(f"{request.url.path} {resp}") return resp @router.post("/getHouseholderInfo", summary="获取住户信息") async def get_householder_info(request: Request, item: GetHouseholdersInfo, token: str = Header(...)): """获取住户信息""" if not authenticate_token(token): raise InvalidException(TOKEN_ERROR) th = get_table_handler() resp = HouseholdersTable.get_householder_info(th, item.search_type, item.search_key, item.role_type, item.page, item.limit) logger.Logger.debug(f"{request.url.path} {resp}") return resp @router.get("/getHouseholderInfo", summary="查询指定住户信息") async def get_householder_detail_info(request: Request, householder_id: int = Query(alias="id"), token: str = Header(...)): """查询指定住户信息""" if not authenticate_token(token): raise InvalidException(TOKEN_ERROR) th = get_table_handler() resp = HouseholdersTable.get_householder_info_by_id(th, householder_id) logger.Logger.debug(f"{request.url.path} {resp}") return resp @router.post("/updateHouseholderInfo", summary="更新指定住户信息") async def update_householder_info(request: Request, item: UpdateHouseholderInfo, token: str = Header(...)): """更新指定住户信息""" if not authenticate_token(token): raise InvalidException(TOKEN_ERROR) th = get_table_handler() resp = HouseholdersTable.update_householder_info(th, item.householder_id, item.property_info) logger.Logger.debug(f"{request.url.path} {resp}") return resp @router.delete("/deleteHouseholderInfo", summary="删除指定住户信息及移除其关联设备授权") async def delete_householder_info(request: Request, householder_id: int = Query(alias="id"), token: str = Header(...)): """删除指定住户的信息及其关联授权""" if not authenticate_token(token): raise InvalidException(TOKEN_ERROR) th = get_table_handler() resp = HouseholdersTable.delete_householder_info(th, householder_id) logger.Logger.debug(f"{request.url.path} {resp}") return resp