# -*- coding:utf-8 -*- """ @Author : xuxingchen @Contact : xuxingchen@sinochem.com @Desc : 住户信息表 & 住户-房产关联表 增&删&改&查 """ import csv import os import re import shutil from typing import Union, Optional, List from pydantic import BaseModel, field_validator from config import PREFIX_PATH, FACES_DIR_PATH, SUB_PATH from config import APP_HOST as HOST_IP from config import IMAGE_SERVER_PORT as PORT from device.call import ServicesCall, DelFaceItem, AddFaceItem from models.devices import DevicesTable from models.houses import HousesTable from utils import logger from utils.database import BaseTable, get_table_handler from utils.misc import now_datetime_second, InvalidException, is_image_valid, get_file_md5, encrypt_number, \ decrypt_number ROLE_TYPES = ["业主", "亲属", "租户"] class HouseholdersInfo(BaseModel): householder_id: int name: str sex: str phone: str rooms: Union[str, list] add_datetime: str @field_validator("rooms") def convert_rooms(cls, value): return [i.strip() for i in value.split(",")] class HouseholderDetailInfo(BaseModel): householder_id: int name: str phone: str face_url: str rooms: list authorized_devices: list class AuthHouseholdersInfo(BaseModel): householder_id: int name: str phone: str rooms: Union[str, list] @field_validator("rooms") def convert_rooms(cls, value): return [i.strip() for i in value.split(",")] class AuthUsersInfo(BaseModel): user_id: int name: str sex: str phone: str card_type: str card_id: str user_type: str start_datetime: str expire_datetime: str class AuthUsersDetailInfo(AuthUsersInfo): face_url: str authorized_devices: List[dict] class AddAuthUserInfo(BaseModel): name: str sex: str phone: str card_type: str card_id: str user_type: str start_datetime: str expire_datetime: str face_url: str authorized_devices: List[int] @field_validator("sex") def check_sex(cls, value): types = ["男", "女"] if value not in types: raise InvalidException(f"请提供正确的sex类型 {types}") else: return value @field_validator('phone') def check_phone(cls, value): pattern = re.compile(r'^(13[0-9]|14[01456879]|15[0-35-9]|16[2567]|17[0-8]|18[0-9]|19[0-35-9])\d{8}$') if pattern.search(value) is None: raise InvalidException("请提供正确的手机号码") th = get_table_handler() if HouseholdersTable.exists_user_phone(th, value): raise InvalidException("手机号码已存在,请勿重复提交") return value @field_validator("user_type") def check_user_type(cls, value): types = ["员工", "访客", "其他"] if value not in types: raise InvalidException(f"请提供正确的user_type类型 {types}") else: return value @field_validator("face_url") def check_face_url(cls, value, values): filename = value.split('/')[-1] file_path = str(os.path.join(FACES_DIR_PATH, filename)) if is_image_valid(file_path): if filename.startswith('_'): new_filename = \ f"{values.data.get('name')}{encrypt_number(values.data.get('phone'))}.{filename.split('.')[-1]}" shutil.copy(file_path, f"{FACES_DIR_PATH}/{new_filename}") os.remove(file_path) return new_filename return filename else: raise InvalidException(f"地址或图片无效,请提供有效的图片地址 {value}") @field_validator("authorized_devices") def check_authorized_devices(cls, value): if len(value) > 0: return value else: raise InvalidException("请提供至少一个门禁设备权限") class UpdateAuthUserInfo(AddAuthUserInfo): user_id: int phone: str @field_validator("user_id") def check_user_id(cls, value): th = get_table_handler() if HouseholdersTable.exists_user_id(th, value): return value else: raise InvalidException("user_id 不存在") @field_validator('phone') def check_phone(cls, value): pattern = re.compile(r'^(13[0-9]|14[01456879]|15[0-35-9]|16[2567]|17[0-8]|18[0-9]|19[0-35-9])\d{8}$') if pattern.search(value) is None: raise InvalidException("请提供正确的手机号码") return value class AddHouseholderInfo(BaseModel): name: str sex: str phone: str property_info: List[tuple[str, str]] @field_validator('sex') def check_sex(cls, value): if value not in ["男", "女"]: raise InvalidException("请提供正确的性别类型") return value @field_validator('phone') def check_phone(cls, value): pattern = re.compile(r'^(13[0-9]|14[01456879]|15[0-35-9]|16[2567]|17[0-8]|18[0-9]|19[0-35-9])\d{8}$') if pattern.search(value) is None: raise InvalidException("请提供正确的手机号码") th = get_table_handler() if HouseholdersTable.exists_householder_phone(th, value): raise InvalidException("住户手机号码已存在,请勿重复提交") return value @field_validator('property_info') def check_property_info(cls, value): th = get_table_handler() room_ids = [] for item in value: if not HousesTable.exists(th, item[0]): raise InvalidException("对应房产不存在") if item[1] not in ROLE_TYPES: raise InvalidException("请提供正确的身份类型") room_ids.append(item[0]) if len(set(room_ids)) != len(room_ids): raise InvalidException("不能存在重复房产") return value class GetHouseholdersInfo(BaseModel): search_type: Optional[str] = None search_key: Optional[str] = None role_type: str = "全部" limit: Optional[int] = None page: Optional[int] = None @field_validator("search_type") def check_search_type(cls, value): types = { "姓名": "name", "电话号码": "phone" } if value not in ["姓名", "电话号码"]: raise InvalidException("请提供正确的关键字类型") else: return types[value] @field_validator("search_key") def check_search_key(cls, value, values): if value and values.data.get("search_type", None): return value else: raise InvalidException("search 参数缺失") @field_validator("page") def check_page(cls, value, values): if (value is not None and value > 0 and values.data.get("limit", None) is not None and values.data.get("limit") != 0): return value else: raise InvalidException("page 参数缺失或异常") @field_validator("role_type") def check_role_type(cls, value): if value not in ROLE_TYPES + ["全部"]: raise InvalidException("请提供正确的身份类型") else: return value class GetAuthHouseholdersInfo(BaseModel): search_type: Optional[str] = None search_key: Optional[str] = None space_type: Optional[str] = None space_id: Optional[str] = None limit: Optional[int] = None page: Optional[int] = None @field_validator("search_type") def check_search_type(cls, value): types = { "住户姓名": "name", "手机号码": "phone" } if value not in types.keys(): raise InvalidException(f"请提供正确的关键字类型 {list(types.keys())}") else: return types[value] @field_validator("search_key") def check_search_key(cls, value, values): if (value or value.strip() == "") and values.data.get("search_type", None): if value.strip() == "": value = None return value else: raise InvalidException("search 参数缺失") @field_validator("space_type") def check_space_type(cls, value): types = { "区域": "name_id", "楼栋": "building_id", "单元": "unit_id", "楼层": "floor_id", "房间号": "room_id" } if value not in types.keys(): raise InvalidException(f"请提供正确的关联房产类型 {list(types.keys())}") else: return types[value] @field_validator("space_id") def check_space_id(cls, value, values): if value and values.data.get("space_type", None): return value else: raise InvalidException("space 参数缺失") @field_validator("page") def check_page(cls, value, values): if (value is not None and value > 0 and values.data.get("limit", None) is not None and values.data.get("limit") != 0): return value else: raise InvalidException("page 参数缺失或异常") class GetAuthUsersInfo(BaseModel): search_type: Optional[str] = None search_key: Optional[str] = None user_type: Optional[str] = None limit: Optional[int] = None page: Optional[int] = None @field_validator("search_type") def check_search_type(cls, value): types = { "姓名": "name", "手机号码": "phone" } if value not in types.keys(): raise InvalidException(f"请提供正确的关键字类型 {list(types.keys())}") else: return types[value] @field_validator("search_key") def check_search_key(cls, value, values): if value and values.data.get("search_type", None): return value else: raise InvalidException("search 参数缺失") @field_validator("user_type") def check_space_type(cls, value): types = { "员工": "员工", "访客": "访客", "其他": "其他" } if value not in types.keys(): raise InvalidException(f"请提供正确的人员类型 {list(types.keys())}") else: return types[value] @field_validator("page") def check_page(cls, value, values): if (value is not None and value > 0 and values.data.get("limit", None) is not None and values.data.get("limit") != 0): return value else: raise InvalidException("page 参数缺失或异常") class UpdateHouseholderInfo(BaseModel): householder_id: int property_info: List[tuple[str, str]] @field_validator('property_info') def check_property_info(cls, value): th = get_table_handler() for item in value: if not HousesTable.exists(th, item[0]): raise InvalidException("对应房产不存在") if item[1] not in ROLE_TYPES: raise InvalidException("请提供正确的身份类型") return value class UpdateHouseholderFace(BaseModel): id: int face_url: str @field_validator("id") def check_id(cls, value): th = get_table_handler() if HouseholdersTable.exists_id(th, value): return value else: raise InvalidException(f"无效ID {value}") @field_validator("face_url") def check_face_url(cls, value, values): filename = value.split('/')[-1] file_path = str(os.path.join(FACES_DIR_PATH, filename)) if is_image_valid(file_path): # 若为临时图片,则进行规则命名 if filename.startswith('_'): th = get_table_handler() name_phone = HouseholdersTable.get_householder_np_by_id(th, values.data.get('id')) new_filename = f"{name_phone['name']}{name_phone['phone']}.{filename.split('.')[-1]}" shutil.copy(file_path, f"{FACES_DIR_PATH}/{new_filename}") os.remove(file_path) return new_filename return filename else: raise InvalidException(f"地址或图片无效,请提供有效的图片地址 {value}") class HouseholdersTable(BaseTable): @staticmethod def check(table_handler: BaseTable): """检测是否存在当前表""" # householders 住户信息表 table_handler.query("SELECT name FROM sqlite_master WHERE type='table' AND name='householders'") if table_handler.cursor.fetchone() is None: table_handler.execute( """ CREATE TABLE householders ( householder_id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT, sex TEXT, phone TEXT, face_url TEXT DEFAULT '', face_md5 TEXT, type TEXT, user_type TEXT, card_type TEXT, card_id TEXT, auth_start TEXT, auth_expire TEXT, add_datetime TEXT, update_datetime TEXT, UNIQUE (phone, type) ) """ ) init_config_path = os.path.join(os.path.dirname(os.path.abspath("__file__")), "data/InitialData/householders.csv") if os.path.exists(init_config_path): with open(init_config_path, newline='', encoding='utf8') as csvfile: csvreader = csv.reader(csvfile) head = next(csvreader) data = [] if len(head) == 11: for row in csvreader: householder_id = row[0].strip() name = row[1].strip() sex = row[2].strip() phone = encrypt_number(row[3].strip()) face_url = row[4].strip() if face_url and os.path.exists(f"{FACES_DIR_PATH}/{face_url}"): face_md5 = get_file_md5(f"{FACES_DIR_PATH}/{face_url}") else: face_md5 = None _type = row[5].strip() if _type != "住户": user_type = row[6].strip() card_type = row[7].strip() card_id = row[8].strip() auth_start = row[9].strip() auth_expire = row[10].strip() else: user_type, card_type, card_id, auth_start, auth_expire = None, None, None, None, None data.append((householder_id, name, sex, phone, face_url, face_md5, _type, user_type, card_type, card_id, auth_start, auth_expire, now_datetime_second(), now_datetime_second())) table_handler.executemany( """ INSERT OR IGNORE INTO householders (householder_id, name, sex, phone, face_url, face_md5, type, user_type, card_type, card_id, auth_start, auth_expire, add_datetime, update_datetime) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, data ) # householders_type 住户-房产关联表 table_handler.query("SELECT name FROM sqlite_master WHERE type='table' AND name='householders_type'") if table_handler.cursor.fetchone() is None: table_handler.execute( """ CREATE TABLE householders_type ( householder_id INT, room_id TEXT, type TEXT, update_datetime TEXT, UNIQUE (householder_id, room_id) ) """ ) init_config_path = os.path.join(os.path.dirname(os.path.abspath("__file__")), "data/InitialData/householders_type.csv") if os.path.exists(init_config_path): with open(init_config_path, newline='', encoding='utf8') as csvfile: csvreader = csv.reader(csvfile) head = next(csvreader) data = [] count_info = {} if len(head) == 3: for row in csvreader: householder_id = row[0].strip() room_id = row[1].strip() _type = row[2].strip() if not HouseholdersTable.exists_householder_type(table_handler, householder_id, room_id): if room_id in count_info.keys(): count_info[room_id] += 1 else: count_info[room_id] = 1 data.append((int(householder_id), room_id, _type, now_datetime_second())) table_handler.executemany( """ INSERT INTO householders_type (householder_id, room_id, type, update_datetime) VALUES (?, ?, ?, ?) ON CONFLICT (householder_id, room_id) DO NOTHING """, data ) for _id, _count in count_info.items(): old_count = HousesTable.get_householder_count(table_handler, _id) HousesTable.update_householder_count(table_handler, _id, old_count + _count) @staticmethod def insert(table_handler: BaseTable, name: str, sex: str, phone: str, _type: str): try: table_handler.execute( """ INSERT OR IGNORE INTO householders (name, sex, phone, type, add_datetime, update_datetime) VALUES (?, ?, ?, ?, ?, ?) """, (name, sex, encrypt_number(phone), _type, now_datetime_second(), now_datetime_second()) ) finally: table_handler.query( """ SELECT householder_id FROM householders WHERE phone = ? """, (encrypt_number(phone),) ) res = table_handler.cursor.fetchall() if res: return res[0][0] else: return None @staticmethod def insert_type(table_handler: BaseTable, room_id: str, householder_id: int, _type: str): table_handler.execute( """ INSERT INTO householders_type (householder_id, room_id, type, update_datetime) VALUES (?, ?, ?, ?) ON CONFLICT (householder_id, room_id) DO UPDATE SET type = ?, update_datetime = ? """, (householder_id, room_id, _type, now_datetime_second(), _type, now_datetime_second()) ) @staticmethod def add_auth_user_info(table_handler: BaseTable, obj: AddAuthUserInfo): """插入通用人员信息 & 进行通用人员设备授权""" now_datetime = now_datetime_second() face_md5 = get_file_md5(f"{FACES_DIR_PATH}/{obj.face_url}") table_handler.execute( """ INSERT OR IGNORE INTO householders (name, sex, phone, face_url, face_md5, type, user_type, card_type, card_id, auth_start, auth_expire, add_datetime, update_datetime) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, (obj.name, obj.sex, encrypt_number(obj.phone), obj.face_url, face_md5, "非住户", obj.user_type, obj.card_type, obj.card_id, obj.start_datetime, obj.expire_datetime, now_datetime, now_datetime) ) user_id = HouseholdersTable.get_householder_id(table_handler, obj.phone, '非住户') failed_ids = [] codes = [] msgs = [] if user_id: try: # 人脸下置,设备授权 sc = ServicesCall() face_item = AddFaceItem( user_id=user_id, name=obj.name, phone_number=obj.phone, face_url=obj.face_url, start_date=obj.start_datetime.replace(' ', 'T') + ':00.000Z', expire_date=obj.expire_datetime.replace(' ', 'T') + ':00.000Z', device_ids="" ) for device_id in obj.authorized_devices: face_item.device_ids = f"[{device_id}]" _callback, code, msg = sc.add_face(device_id, face_item) if _callback: DevicesTable.insert_device_auth_record(table_handler, device_id, face_item.user_id, face_item.start_date, face_item.expire_date) else: failed_ids.append(device_id) codes.append(code) msgs.append(msg) finally: if 0 < len(failed_ids): table_handler.execute( """ DELETE FROM householders WHERE householder_id = ? """, (user_id,) ) return {"status": False, "message": f"部分设备 - {failed_ids} 人脸授权失败,错误码:{codes},{msgs}"} return {"status": True} @staticmethod def get_householder_id(table_handler: BaseTable, phone: str, _type: str) -> str: table_handler.query("SELECT householder_id FROM householders WHERE phone=? AND type=?", (encrypt_number(phone), _type)) res = table_handler.cursor.fetchall() if res: return res[0][0] else: raise None @staticmethod def get_householder_info(table_handler: BaseTable, search_type, search_key, role_type, page: Optional[int] = None, limit: Optional[int] = None): sub_sql_list = [] if role_type != "全部": sub_sql_list.append(f"types like '%{role_type}%'") if search_type is not None: if search_type == "name": sub_sql_list.append(f"name like '%{search_key}%'") else: sub_sql_list.append(f"{search_type} = '{search_key}'") if sub_sql_list: sub_sql = "WHERE " + " AND ".join(sub_sql_list) else: sub_sql = "" if page is not None and limit is not None: page_sql = f"LIMIT {limit} OFFSET {(page - 1) * limit}" else: page_sql = "" query_sql = f""" SELECT * FROM ( SELECT h.householder_id, h.name, h.sex, h.phone, GROUP_CONCAT(houses.house_name) AS rooms, GROUP_CONCAT(hh.type) AS types, h.add_datetime FROM householders h LEFT JOIN householders_type hh ON h.householder_id = hh.householder_id LEFT JOIN houses ON hh.room_id = houses.room_id WHERE h.type = '住户' GROUP BY h.householder_id, h.name, h.sex, h.phone, h.add_datetime ) {sub_sql} """ table_handler.query(f"SELECT COUNT(*) FROM ({query_sql})") total = table_handler.cursor.fetchall()[0][0] table_handler.query(f"{query_sql} {page_sql}") res = table_handler.cursor.fetchall() if res: householder_info = [] for i in res: householder_info.append(HouseholdersInfo( householder_id=i[0], name=i[1], sex=i[2], phone=decrypt_number(i[3]), rooms=i[4], add_datetime=i[6] ).__dict__) return {"info": householder_info, "total": total} else: return {"info": [], "total": total} @staticmethod def get_auth_householder_info(table_handler: BaseTable, search_type: Optional[str], search_key: Optional[str], space_type: Optional[str], space_id: Optional[str], page: Optional[int] = None, limit: Optional[int] = None): sub_sql_list = [] if search_type is not None and search_key is not None: if search_type == "name": sub_sql_list.append(f"{search_type} like '%{search_key}%'") else: sub_sql_list.append(f"{search_type} = '{search_key}'") if space_type is not None and space_id is not None: space_sub_sql = f""" AND h.householder_id IN ( SELECT DISTINCT h1.householder_id FROM householders h1 LEFT JOIN householders_type hh1 ON h1.householder_id = hh1.householder_id LEFT JOIN houses houses1 ON hh1.room_id = houses1.room_id WHERE houses1.{space_type} = '{space_id}' ) """ else: space_sub_sql = "" if sub_sql_list: sub_sql = "WHERE " + " AND ".join(sub_sql_list) else: sub_sql = "" if page is not None and limit is not None: page_sql = f"LIMIT {limit} OFFSET {(page - 1) * limit}" else: page_sql = "" query_sql = f""" SELECT * FROM ( SELECT h.householder_id, h.name, h.phone, GROUP_CONCAT(houses.house_name || '(' || hh.type || ')') AS rooms FROM householders h LEFT JOIN householders_type hh ON h.householder_id = hh.householder_id LEFT JOIN houses ON hh.room_id = houses.room_id WHERE h.type = '住户' {space_sub_sql} GROUP BY h.householder_id, h.name, h.phone ) {sub_sql} """ table_handler.query(f"SELECT COUNT(*) FROM ({query_sql})") total = table_handler.cursor.fetchall()[0][0] table_handler.query(f"{query_sql} {page_sql}") res = table_handler.cursor.fetchall() if res: householder_info = [] for i in res: householder_info.append(AuthHouseholdersInfo( householder_id=i[0], name=i[1], phone=decrypt_number(i[2]), rooms=i[3] ).__dict__) return {"info": householder_info, "total": total} else: return {"info": [], "total": total} @staticmethod def get_auth_users_info(table_handler: BaseTable, search_type: Optional[str], search_key: Optional[str], user_type: Optional[str], page: Optional[int] = None, limit: Optional[int] = None): sub_sql_list = [] if search_type is not None: if search_type == "name": sub_sql_list.append(f"{search_type} like '%{search_key}%'") else: sub_sql_list.append(f"{search_type} = '{search_key}'") if user_type is not None: sub_sql_list.append(f"user_type = '{user_type}'") if sub_sql_list: sub_sql = "AND " + " AND ".join(sub_sql_list) else: sub_sql = "" if page is not None and limit is not None: page_sql = f"LIMIT {limit} OFFSET {(page - 1) * limit}" else: page_sql = "" query_sql = f""" SELECT householder_id, name, sex, phone, card_type, card_id, user_type, auth_start, auth_expire FROM householders WHERE type != '住户' {sub_sql} """ table_handler.query(f"SELECT COUNT(*) FROM ({query_sql})") total = table_handler.cursor.fetchall()[0][0] table_handler.query(f"{query_sql} {page_sql}") res = table_handler.cursor.fetchall() if res: householder_info = [] for i in res: householder_info.append(AuthUsersInfo( user_id=i[0], name=i[1], sex=i[2], phone=decrypt_number(i[3]), card_type=i[4], card_id=i[5], user_type=i[6], start_datetime=i[7], expire_datetime=i[8] ).__dict__) return {"info": householder_info, "total": total} else: return {"info": [], "total": total} @staticmethod def get_auth_user_info(table_handler: BaseTable, householder_id: int): query_sql = """ SELECT householder_id, name, sex, phone, face_url, card_type, card_id, user_type, auth_start, auth_expire FROM householders WHERE type != '住户' AND householder_id = ? """ table_handler.query(query_sql, (householder_id,)) res = table_handler.cursor.fetchall() if res: householder_info = AuthUsersDetailInfo( user_id=res[0][0], name=res[0][1], sex=res[0][2], phone=decrypt_number(res[0][3]), face_url=f"http://{HOST_IP}:{PORT}{PREFIX_PATH}/{SUB_PATH}/{res[0][4]}" if res[0][4] else '', card_type=res[0][5], card_id=res[0][6], user_type=res[0][7], start_datetime=res[0][8], expire_datetime=res[0][9], authorized_devices=DevicesTable.get_auth_device_info(table_handler, str(res[0][0])) ).__dict__ return householder_info else: raise InvalidException(f"无效ID {householder_id}") @staticmethod def get_room_ids(table_handler: BaseTable, householder_id): """获取住户的所有房产ID""" table_handler.query( """ SELECT room_id FROM householders_type WHERE householder_id = ? """, (householder_id,) ) res = table_handler.cursor.fetchall() if res: return [i[0] for i in res] else: return [] @staticmethod def get_rooms_by_householder_id(table_handler: BaseTable, householder_id): """获取住户的所有房产ID""" table_handler.query( """ SELECT h.house_name, type, ht.room_id FROM householders_type ht LEFT JOIN houses h ON h.room_id = ht.room_id WHERE householder_id = ? """, (householder_id,) ) res = table_handler.cursor.fetchall() if res: return [list(i) for i in res] else: return [] @staticmethod def get_householder_np_by_id(table_handler: BaseTable, householder_id: int): table_handler.query( """ SELECT name, phone FROM householders WHERE householder_id = ? """, (householder_id,) ) res = table_handler.cursor.fetchall() if res: return {"name": res[0][0], "phone": decrypt_number(res[0][1])} else: raise InvalidException(f"无效ID {householder_id}") @staticmethod def get_face_url(table_handler: BaseTable, householder_id: int): table_handler.query( """ SELECT DISTINCT face_url FROM householders WHERE householder_id = ? """, (householder_id,) ) res = table_handler.cursor.fetchall() if res: return res[0][0] else: return "" @staticmethod def get_face_md5(table_handler: BaseTable, householder_id: int): table_handler.query( """ SELECT DISTINCT face_md5 FROM householders WHERE householder_id = ? """, (householder_id,) ) res = table_handler.cursor.fetchall() if res: return res[0][0] else: return "" @staticmethod def get_householder_info_by_id(table_handler: BaseTable, householder_id: int): table_handler.query( """ SELECT name, sex, phone FROM householders WHERE householder_id = ? """, (householder_id,) ) res = table_handler.cursor.fetchall() if res: householder_info = { "householder_id": householder_id, "name": res[0][0], "sex": res[0][1], "phone": decrypt_number(res[0][2]), "property_info": [] } table_handler.query( """ SELECT householders_type.room_id, house_name, area_name, building_name, unit_name, householders_type.type, householder_count FROM householders_type LEFT JOIN houses ON houses.room_id = householders_type.room_id WHERE householder_id = ? """, (householder_id,) ) res = table_handler.cursor.fetchall() if res: for i in res: householder_info["property_info"].append({ "room_id": i[0], "room_name": i[1], "area_name": i[2] if i[2] else "", "building_name": i[3] if i[3] else "", "unit_name": i[4] if i[4] else "", "type": i[5], "household_count": i[6] }) return householder_info return {} @staticmethod def get_auth_householder_detail_info(table_handler: BaseTable, householder_id: int): info = HouseholdersTable.get_householder_info_by_id(table_handler, householder_id) rooms = HouseholdersTable.get_rooms_by_householder_id(table_handler, info["householder_id"]) face_url = HouseholdersTable.get_face_url(table_handler, householder_id) device_ids = DevicesTable.get_auth_device_ids(table_handler, str(householder_id)) authorized_devices = [] for device_id in device_ids: start_date, expire_date = DevicesTable.get_auth_interval(table_handler, str(householder_id), device_id) authorized_devices.append({ "device_name": DevicesTable.get_device_name(table_handler, device_id), "device_type": DevicesTable.get_device_scope_type(table_handler, device_id), "open_type": ["人脸"], "start_date": start_date, "expire_date": expire_date, "method": "自动授权", "from": "系统" }) resp = HouseholderDetailInfo( householder_id=info["householder_id"], name=info["name"], phone=info["phone"], face_url=f"http://{HOST_IP}:{PORT}{PREFIX_PATH}/{SUB_PATH}/{face_url}", rooms=rooms, authorized_devices=authorized_devices ) return resp.__dict__ @staticmethod def update_householder_info(table_handler: BaseTable, householder_id: int, property_info: List[tuple[str, str]]): # 查询对应住户关联的房产信息,比对更新数据中的房产信息 update_room_ids = [i[0] for i in property_info] update_types = [i[1] for i in property_info] table_handler.query( """ SELECT room_id, type FROM householders_type WHERE householder_id = ? """, (householder_id,) ) res = table_handler.cursor.fetchall() if res: update_data = [] delete_data = [] insert_data = [[v, update_types[i]] for i, v in enumerate(update_room_ids)] sqls = [] params = [] update_count = [] for i in res: try: index = update_room_ids.index(i[0]) if update_types[index] != i[1]: update_data.append([i[0], update_types[index]]) insert_data.remove([i[0], update_types[index]]) except ValueError: delete_data.append(i[0]) table_handler.query( """ SELECT name, phone, face_url, auth_start, auth_expire FROM householders WHERE householder_id = ? """, (householder_id,) ) householder_info = table_handler.cursor.fetchall()[0] sc = ServicesCall() face_item = AddFaceItem( user_id=householder_id, name=householder_info[0], phone_number=householder_info[1], face_url=householder_info[2], start_date=householder_info[3].replace(' ', 'T') + ':00.000Z', expire_date=householder_info[4].replace(' ', 'T') + ':00.000Z', device_ids="" ) if update_data: for data in update_data: sqls.append( """ UPDATE householders_type SET type = ?, update_datetime = ? WHERE householder_id = ? AND room_id = ? """ ) params.append((data[1], now_datetime_second(), householder_id, data[0])) if insert_data: for data in insert_data: sqls.append( """ INSERT INTO householders_type (room_id, householder_id, type, update_datetime) VALUES (?, ?, ?, ?) """ ) params.append((data[0], householder_id, data[1], now_datetime_second())) old_count = HousesTable.get_householder_count(table_handler, data[0]) update_count.append([data[0], old_count + 1]) # TODO 同步增加对应设备中的人脸信息 unit_id = HousesTable.get_unit_id_by_room_id(table_handler, data[0]) device_ids = DevicesTable.get_device_ids_by_unit_id(table_handler, unit_id) for device_id in device_ids: face_item.device_ids = f"[{device_id}]" _callback, code, msg = sc.add_face(device_id, face_item) if _callback: DevicesTable.insert_device_auth_record(table_handler, device_id, face_item.user_id, face_item.start_date, face_item.expire_date) logger.Logger.error(f"[update_householder_info] " f"住户[{householder_id}]人脸增加失败,失败代码:{code},提示信息:{msg}") if delete_data: for room_id in delete_data: sqls.append( """ DELETE FROM householders_type WHERE householder_id = ? AND room_id = ? """ ) params.append((householder_id, room_id)) old_count = HousesTable.get_householder_count(table_handler, room_id) update_count.append([room_id, old_count - 1]) # TODO 获取房产-单元 同步移除对应设备中的人脸 unit_id = HousesTable.get_unit_id_by_room_id(table_handler, room_id) device_ids = DevicesTable.get_device_ids_by_unit_id(table_handler, unit_id) for device_id in device_ids: _callback, code, msg = sc.del_face(device_id, DelFaceItem(user_id=householder_id, device_ids=f"[{device_id}]")) if _callback: DevicesTable.delete_invalid_auth_record(table_handler, device_id, str(householder_id)) logger.Logger.error(f"[update_householder_info] " f"住户[{householder_id}]人脸移除失败,失败代码:{code},提示信息:{msg}") if len(update_room_ids) == 0: sqls.append( """ DELETE FROM householders WHERE householder_id = ? """ ) params.append((householder_id,)) table_handler.execute(sqls, params) for i in update_count: HousesTable.update_householder_count(table_handler, i[0], i[1]) return {"status": True} else: return {"status": False, "message": "不存在对应住户或住户名下无关联房产"} @staticmethod def update_auth_user_info(table_handler: BaseTable, obj: UpdateAuthUserInfo): # 通用人员授权信息更新 nd = now_datetime_second().replace(' ', 'T') + '.000Z' sd = obj.start_datetime.replace(' ', 'T') + ':00.000Z' ed = obj.expire_datetime.replace(' ', 'T') + ':00.000Z' sc = ServicesCall() face_item = AddFaceItem( user_id=obj.user_id, name=obj.name, phone_number=obj.phone, face_url=obj.face_url, start_date=sd, expire_date=ed, device_ids="" ) old_face_url = HouseholdersTable.get_face_url(table_handler, obj.user_id) # 查询当前人员授权情况 table_handler.query( """ SELECT device_id, start_date, expire_date FROM devices_auth WHERE _id = ? """, (obj.user_id,) ) res = table_handler.cursor.fetchall() if res: delete_data = [] # 移除权限 insert_data = [[device_id, sd, ed] for device_id in obj.authorized_devices] # 新增权限 for i in res: if old_face_url == obj.face_url: # 对比人脸图片是否变更 # 人脸无实际变更,根据权限变更重新赋权 logger.Logger.debug("新旧人脸一致,按需赋权") if [i[0], sd, ed] in insert_data and ed == i[2] and (sd < nd or sd == i[1]): insert_data.remove([i[0], sd, ed]) else: delete_data.append(i[0]) else: # 若人脸变更直接移除所有人脸授权,重新赋权 delete_data.append(i[0]) delete_failed_ids = [] delete_failed_codes = [] delete_failed_msgs = [] message = "" if delete_data: for device_id in delete_data: _callback, code, msg = sc.del_face(device_id, DelFaceItem( user_id=obj.user_id, device_ids=f"[{device_id}]")) if _callback: DevicesTable.delete_invalid_auth_record(table_handler, device_id, face_item.user_id) else: delete_failed_ids.append(device_id) delete_failed_codes.append(code) delete_failed_msgs.append(msg) if 0 < len(delete_failed_ids) < len(delete_data): message += f"部分设备 - {delete_failed_ids} 人脸移除失败,错误码:{delete_failed_codes}, {delete_failed_msgs}" elif len(delete_failed_ids) == len(delete_data) != 0: raise InvalidException("设备授权无法更新:人脸移除失败") insert_failed_ids = [] insert_failed_codes = [] if insert_data: for data in insert_data: face_item.device_ids = f"[{data[0]}]" _callback, code, msg = sc.add_face(data[0], face_item) if _callback: DevicesTable.insert_device_auth_record(table_handler, data[0], face_item.user_id, face_item.start_date, face_item.expire_date) else: insert_failed_ids.append(data[0]) insert_failed_codes.append(code) if 0 < len(insert_failed_ids) < len(insert_data): message += f"部分设备 - {delete_failed_ids} 人脸授权失败,错误码:{delete_failed_codes}" elif len(delete_failed_ids) == len(delete_data) != 0: raise InvalidException("设备授权无法更新:人脸下发失败") if message != '': return InvalidException(message) else: table_handler.execute( """ UPDATE householders SET name = ?, sex = ?, phone= ?, face_url = ?, face_md5= ?, user_type = ?, card_type = ?, card_id = ?, auth_start = ?, auth_expire = ?, update_datetime = ? WHERE householder_id = ? """, (obj.name, obj.sex, encrypt_number(obj.phone), obj.face_url, get_file_md5(f"{FACES_DIR_PATH}/{obj.face_url}"), obj.user_type, obj.card_type, obj.card_id, sd, ed, now_datetime_second(), obj.user_id) ) return {"status": True} else: return {"status": False, "message": "不存在对应ID的授权"} @staticmethod def update_householder_face(table_handler: BaseTable, householder_id: int, face_url: str): """更新住户信息中的人脸地址 & 人脸地址同步到权限内的设备""" new_md5 = get_file_md5(f"{FACES_DIR_PATH}/{face_url}") if new_md5 != HouseholdersTable.get_face_md5(table_handler, householder_id): table_handler.execute( """ UPDATE householders SET face_url = ?, update_datetime = ? WHERE householder_id = ? """, (face_url, now_datetime_second(), householder_id) ) # 查询住户关联单元,获取单元权限内的设备,下发人脸 table_handler.query( """ SELECT h.householder_id, name, phone, GROUP_CONCAT(unit_id) as unit_ids FROM householders h LEFT JOIN householders_type ht ON ht.householder_id = h.householder_id LEFT JOIN houses ON ht.room_id = houses.room_id WHERE h.type = '住户' AND h.householder_id = ? """, (householder_id,) ) res = table_handler.cursor.fetchall() if res: sc = ServicesCall() face_item = AddFaceItem( user_id=res[0][0], name=res[0][1], phone_number=decrypt_number(res[0][2]), face_url=face_url, device_ids="" ) for unit_id in res[0][3].split(','): device_ids = DevicesTable.get_device_ids_by_unit_id(table_handler, unit_id) for device_id in device_ids: face_item.device_ids = f"[{device_id}]" _callback, code, msg = sc.add_face(device_id, face_item) if _callback: DevicesTable.insert_device_auth_record(table_handler, device_id, face_item.user_id, face_item.start_date, face_item.expire_date) return {"status": True} else: logger.Logger.debug("新旧人脸一致,不做任何变更") return {"status": True} @staticmethod def update_user_info(table_handler: BaseTable, householder_id: int, face_url: str): """更新通用人员信息中的人脸地址 & 人脸地址同步到下发过的设备中""" table_handler.execute( """ UPDATE householders SET face_url = ? WHERE householder_id = ? """, (face_url, householder_id) ) # 查询人员授权过的设备,下发人脸 device_ids = DevicesTable.get_auth_device_ids(table_handler, str(householder_id)) name_phone = HouseholdersTable.get_householder_np_by_id(table_handler, householder_id) sc = ServicesCall() face_item = AddFaceItem( user_id=householder_id, name=name_phone["name"], phone_number=name_phone["phone"], face_url=face_url, device_ids="" ) for device_id in device_ids: face_item.device_ids = f"[{device_id}]" _callback, code, _ = sc.add_face(device_id, face_item) if _callback: DevicesTable.insert_device_auth_record(table_handler, device_id, face_item.user_id, face_item.start_date, face_item.expire_date) return {"status": True} @staticmethod def delete_householder_info(table_handler: BaseTable, householder_id: int): # 确认住户ID有效 if not HouseholdersTable.exists_householder_id(table_handler, householder_id): raise InvalidException("不存在对应住户ID") # 1. 查询当前住户授权的授权的设备ID device_ids = DevicesTable.get_auth_device_ids(table_handler, str(householder_id)) # 2. 对所有授权设备进行授权移除操作 if device_ids: sc = ServicesCall() _callback, code, msg = sc.del_face(device_ids[0], DelFaceItem( user_id=str(householder_id), device_ids=str(device_ids) )) if not _callback: raise InvalidException(f"住户人脸授权移除失败,错误码{code},{msg}") DevicesTable.delete_householder_all_auth(table_handler, str(householder_id)) # 3. 同步更新住户名下房产入住人数,并移除住户记录 room_ids = HouseholdersTable.get_room_ids(table_handler, householder_id) for room_id in room_ids: old_count = HousesTable.get_householder_count(table_handler, room_id) HousesTable.update_householder_count(table_handler, room_id, old_count - 1) sqls = [ "DELETE FROM householders WHERE householder_id = ?", "DELETE FROM householders_type WHERE householder_id = ?" ] table_handler.execute(sqls, [(householder_id,), (householder_id,)]) return {"status": True} @staticmethod def delete_auth_user_info(table_handler: BaseTable, user_id: int): # 确认通用用户ID有效 if not HouseholdersTable.exists_user_id(table_handler, user_id): raise InvalidException("不存在对应通用用户ID") # 1. 查询当前住户授权的授权的设备ID device_ids = DevicesTable.get_auth_device_ids(table_handler, str(user_id)) # 2. 对所有授权设备进行授权移除操作 if device_ids: sc = ServicesCall() _callback, code, msg = sc.del_face(device_ids[0], DelFaceItem( user_id=str(user_id), device_ids=str(device_ids) )) if not _callback: raise InvalidException(f"住户人脸授权移除失败,错误码{code},{msg}") DevicesTable.delete_householder_all_auth(table_handler, str(user_id)) sqls = [ "DELETE FROM householders WHERE householder_id = ?", "DELETE FROM householders_type WHERE householder_id = ?" ] table_handler.execute(sqls, [(user_id,), (user_id,)]) return {"status": True} @staticmethod def delete_type(table_handler: BaseTable, householder_id: int): table_handler.execute( """ DELETE FROM householders_type WHERE householder_id = ? """, (householder_id,) ) @staticmethod def exists_householder_phone(table_handler: BaseTable, phone: str) -> bool: """查询住户手机号码是否存在""" table_handler.query( """ SELECT householder_id FROM householders WHERE type = '住户' AND phone = ? """, (encrypt_number(phone),) ) res = table_handler.cursor.fetchall() if res: return True else: return False @staticmethod def exists_user_phone(table_handler: BaseTable, phone: str): """查询非住户手机号码是否存在""" table_handler.query( """ SELECT householder_id FROM householders WHERE type != '住户' AND phone = ? """, (encrypt_number(phone),) ) res = table_handler.cursor.fetchall() if res: return True else: return False @staticmethod def exists_id(table_handler: BaseTable, householder_id: int) -> bool: """查询ID是否存在""" table_handler.query( """ SELECT householder_id FROM householders WHERE householder_id = ? """, (householder_id,) ) res = table_handler.cursor.fetchall() if res: return True else: return False @staticmethod def exists_householder_id(table_handler: BaseTable, householder_id: int) -> bool: """查询住户ID是否存在""" table_handler.query( """ SELECT householder_id FROM householders WHERE type = '住户' AND householder_id = ? """, (householder_id,) ) res = table_handler.cursor.fetchall() if res: return True else: return False @staticmethod def exists_user_id(table_handler: BaseTable, user_id: int) -> bool: """查询非住户ID是否存在""" table_handler.query( """ SELECT householder_id FROM householders WHERE type != '住户' AND householder_id = ? """, (user_id,) ) res = table_handler.cursor.fetchall() if res: return True else: return False @staticmethod def exists_householder_type(table_handler: BaseTable, householder_id: str, room_id: str) -> bool: """查询住户-房产关联信息是否存在""" table_handler.query( """ SELECT householder_id FROM householders_type WHERE householder_id = ? AND room_id = ? """, (householder_id, room_id) ) res = table_handler.cursor.fetchall() if res: return True else: return False