history-project/SCLP/models/householders.py
2024-10-24 09:23:39 +08:00

1370 lines
55 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding:utf-8 -*-
"""
@Author : xuxingchen
@Contact : xuxingchen@sinochem.com
@Desc : 住户信息表 & 住户-房产关联表 增&删&改&查
"""
import csv
import os
import re
import shutil
from typing import Union, Optional, List
from pydantic import BaseModel, field_validator
from config import PREFIX_PATH, FACES_DIR_PATH, SUB_PATH
from config import APP_HOST as HOST_IP
from config import IMAGE_SERVER_PORT as PORT
from device.call import ServicesCall, DelFaceItem, AddFaceItem
from models.devices import DevicesTable
from models.houses import HousesTable
from utils import logger
from utils.database import BaseTable, get_table_handler
from utils.misc import now_datetime_second, InvalidException, is_image_valid, get_file_md5, encrypt_number, \
decrypt_number
ROLE_TYPES = ["业主", "亲属", "租户"]
class HouseholdersInfo(BaseModel):
householder_id: int
name: str
sex: str
phone: str
rooms: Union[str, list]
add_datetime: str
@field_validator("rooms")
def convert_rooms(cls, value):
return [i.strip() for i in value.split(",")]
class HouseholderDetailInfo(BaseModel):
householder_id: int
name: str
phone: str
face_url: str
rooms: list
authorized_devices: list
class AuthHouseholdersInfo(BaseModel):
householder_id: int
name: str
phone: str
rooms: Union[str, list]
@field_validator("rooms")
def convert_rooms(cls, value):
return [i.strip() for i in value.split(",")]
class AuthUsersInfo(BaseModel):
user_id: int
name: str
sex: str
phone: str
card_type: str
card_id: str
user_type: str
start_datetime: str
expire_datetime: str
class AuthUsersDetailInfo(AuthUsersInfo):
face_url: str
authorized_devices: List[dict]
class AddAuthUserInfo(BaseModel):
name: str
sex: str
phone: str
card_type: str
card_id: str
user_type: str
start_datetime: str
expire_datetime: str
face_url: str
authorized_devices: List[int]
@field_validator("sex")
def check_sex(cls, value):
types = ["", ""]
if value not in types:
raise InvalidException(f"请提供正确的sex类型 {types}")
else:
return value
@field_validator('phone')
def check_phone(cls, value):
pattern = re.compile(r'^(13[0-9]|14[01456879]|15[0-35-9]|16[2567]|17[0-8]|18[0-9]|19[0-35-9])\d{8}$')
if pattern.search(value) is None:
raise InvalidException("请提供正确的手机号码")
th = get_table_handler()
if HouseholdersTable.exists_user_phone(th, value):
raise InvalidException("手机号码已存在,请勿重复提交")
return value
@field_validator("user_type")
def check_user_type(cls, value):
types = ["员工", "访客", "其他"]
if value not in types:
raise InvalidException(f"请提供正确的user_type类型 {types}")
else:
return value
@field_validator("face_url")
def check_face_url(cls, value, values):
filename = value.split('/')[-1]
file_path = str(os.path.join(FACES_DIR_PATH, filename))
if is_image_valid(file_path):
if filename.startswith('_'):
new_filename = \
f"{values.data.get('name')}{encrypt_number(values.data.get('phone'))}.{filename.split('.')[-1]}"
shutil.copy(file_path, f"{FACES_DIR_PATH}/{new_filename}")
os.remove(file_path)
return new_filename
return filename
else:
raise InvalidException(f"地址或图片无效,请提供有效的图片地址 {value}")
@field_validator("authorized_devices")
def check_authorized_devices(cls, value):
if len(value) > 0:
return value
else:
raise InvalidException("请提供至少一个门禁设备权限")
class UpdateAuthUserInfo(AddAuthUserInfo):
user_id: int
phone: str
@field_validator("user_id")
def check_user_id(cls, value):
th = get_table_handler()
if HouseholdersTable.exists_user_id(th, value):
return value
else:
raise InvalidException("user_id 不存在")
@field_validator('phone')
def check_phone(cls, value):
pattern = re.compile(r'^(13[0-9]|14[01456879]|15[0-35-9]|16[2567]|17[0-8]|18[0-9]|19[0-35-9])\d{8}$')
if pattern.search(value) is None:
raise InvalidException("请提供正确的手机号码")
return value
class AddHouseholderInfo(BaseModel):
name: str
sex: str
phone: str
property_info: List[tuple[str, str]]
@field_validator('sex')
def check_sex(cls, value):
if value not in ["", ""]:
raise InvalidException("请提供正确的性别类型")
return value
@field_validator('phone')
def check_phone(cls, value):
pattern = re.compile(r'^(13[0-9]|14[01456879]|15[0-35-9]|16[2567]|17[0-8]|18[0-9]|19[0-35-9])\d{8}$')
if pattern.search(value) is None:
raise InvalidException("请提供正确的手机号码")
th = get_table_handler()
if HouseholdersTable.exists_householder_phone(th, value):
raise InvalidException("住户手机号码已存在,请勿重复提交")
return value
@field_validator('property_info')
def check_property_info(cls, value):
th = get_table_handler()
room_ids = []
for item in value:
if not HousesTable.exists(th, item[0]):
raise InvalidException("对应房产不存在")
if item[1] not in ROLE_TYPES:
raise InvalidException("请提供正确的身份类型")
room_ids.append(item[0])
if len(set(room_ids)) != len(room_ids):
raise InvalidException("不能存在重复房产")
return value
class GetHouseholdersInfo(BaseModel):
search_type: Optional[str] = None
search_key: Optional[str] = None
role_type: str = "全部"
limit: Optional[int] = None
page: Optional[int] = None
@field_validator("search_type")
def check_search_type(cls, value):
types = {
"姓名": "name",
"电话号码": "phone"
}
if value not in ["姓名", "电话号码"]:
raise InvalidException("请提供正确的关键字类型")
else:
return types[value]
@field_validator("search_key")
def check_search_key(cls, value, values):
if value and values.data.get("search_type", None):
return value
else:
raise InvalidException("search 参数缺失")
@field_validator("page")
def check_page(cls, value, values):
if (value is not None and value > 0
and values.data.get("limit", None) is not None and values.data.get("limit") != 0):
return value
else:
raise InvalidException("page 参数缺失或异常")
@field_validator("role_type")
def check_role_type(cls, value):
if value not in ROLE_TYPES + ["全部"]:
raise InvalidException("请提供正确的身份类型")
else:
return value
class GetAuthHouseholdersInfo(BaseModel):
search_type: Optional[str] = None
search_key: Optional[str] = None
space_type: Optional[str] = None
space_id: Optional[str] = None
limit: Optional[int] = None
page: Optional[int] = None
@field_validator("search_type")
def check_search_type(cls, value):
types = {
"住户姓名": "name",
"手机号码": "phone"
}
if value not in types.keys():
raise InvalidException(f"请提供正确的关键字类型 {list(types.keys())}")
else:
return types[value]
@field_validator("search_key")
def check_search_key(cls, value, values):
if (value or value.strip() == "") and values.data.get("search_type", None):
if value.strip() == "":
value = None
return value
else:
raise InvalidException("search 参数缺失")
@field_validator("space_type")
def check_space_type(cls, value):
types = {
"区域": "name_id",
"楼栋": "building_id",
"单元": "unit_id",
"楼层": "floor_id",
"房间号": "room_id"
}
if value not in types.keys():
raise InvalidException(f"请提供正确的关联房产类型 {list(types.keys())}")
else:
return types[value]
@field_validator("space_id")
def check_space_id(cls, value, values):
if value and values.data.get("space_type", None):
return value
else:
raise InvalidException("space 参数缺失")
@field_validator("page")
def check_page(cls, value, values):
if (value is not None and value > 0
and values.data.get("limit", None) is not None and values.data.get("limit") != 0):
return value
else:
raise InvalidException("page 参数缺失或异常")
class GetAuthUsersInfo(BaseModel):
search_type: Optional[str] = None
search_key: Optional[str] = None
user_type: Optional[str] = None
limit: Optional[int] = None
page: Optional[int] = None
@field_validator("search_type")
def check_search_type(cls, value):
types = {
"姓名": "name",
"手机号码": "phone"
}
if value not in types.keys():
raise InvalidException(f"请提供正确的关键字类型 {list(types.keys())}")
else:
return types[value]
@field_validator("search_key")
def check_search_key(cls, value, values):
if value and values.data.get("search_type", None):
return value
else:
raise InvalidException("search 参数缺失")
@field_validator("user_type")
def check_space_type(cls, value):
types = {
"员工": "员工",
"访客": "访客",
"其他": "其他"
}
if value not in types.keys():
raise InvalidException(f"请提供正确的人员类型 {list(types.keys())}")
else:
return types[value]
@field_validator("page")
def check_page(cls, value, values):
if (value is not None and value > 0
and values.data.get("limit", None) is not None and values.data.get("limit") != 0):
return value
else:
raise InvalidException("page 参数缺失或异常")
class UpdateHouseholderInfo(BaseModel):
householder_id: int
property_info: List[tuple[str, str]]
@field_validator('property_info')
def check_property_info(cls, value):
th = get_table_handler()
for item in value:
if not HousesTable.exists(th, item[0]):
raise InvalidException("对应房产不存在")
if item[1] not in ROLE_TYPES:
raise InvalidException("请提供正确的身份类型")
return value
class UpdateHouseholderFace(BaseModel):
id: int
face_url: str
@field_validator("id")
def check_id(cls, value):
th = get_table_handler()
if HouseholdersTable.exists_id(th, value):
return value
else:
raise InvalidException(f"无效ID {value}")
@field_validator("face_url")
def check_face_url(cls, value, values):
filename = value.split('/')[-1]
file_path = str(os.path.join(FACES_DIR_PATH, filename))
if is_image_valid(file_path):
# 若为临时图片,则进行规则命名
if filename.startswith('_'):
th = get_table_handler()
name_phone = HouseholdersTable.get_householder_np_by_id(th, values.data.get('id'))
new_filename = f"{name_phone['name']}{name_phone['phone']}.{filename.split('.')[-1]}"
shutil.copy(file_path, f"{FACES_DIR_PATH}/{new_filename}")
os.remove(file_path)
return new_filename
return filename
else:
raise InvalidException(f"地址或图片无效,请提供有效的图片地址 {value}")
class HouseholdersTable(BaseTable):
@staticmethod
def check(table_handler: BaseTable):
"""检测是否存在当前表"""
# householders 住户信息表
table_handler.query("SELECT name FROM sqlite_master WHERE type='table' AND name='householders'")
if table_handler.cursor.fetchone() is None:
table_handler.execute(
"""
CREATE TABLE householders (
householder_id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
sex TEXT,
phone TEXT,
face_url TEXT DEFAULT '',
face_md5 TEXT,
type TEXT,
user_type TEXT,
card_type TEXT,
card_id TEXT,
auth_start TEXT,
auth_expire TEXT,
add_datetime TEXT,
update_datetime TEXT,
UNIQUE (phone, type)
)
"""
)
init_config_path = os.path.join(os.path.dirname(os.path.abspath("__file__")),
"data/InitialData/householders.csv")
if os.path.exists(init_config_path):
with open(init_config_path, newline='', encoding='utf8') as csvfile:
csvreader = csv.reader(csvfile)
head = next(csvreader)
data = []
if len(head) == 11:
for row in csvreader:
householder_id = row[0].strip()
name = row[1].strip()
sex = row[2].strip()
phone = encrypt_number(row[3].strip())
face_url = row[4].strip()
if face_url and os.path.exists(f"{FACES_DIR_PATH}/{face_url}"):
face_md5 = get_file_md5(f"{FACES_DIR_PATH}/{face_url}")
else:
face_md5 = None
_type = row[5].strip()
if _type != "住户":
user_type = row[6].strip()
card_type = row[7].strip()
card_id = row[8].strip()
auth_start = row[9].strip()
auth_expire = row[10].strip()
else:
user_type, card_type, card_id, auth_start, auth_expire = None, None, None, None, None
data.append((householder_id, name, sex, phone, face_url, face_md5,
_type, user_type, card_type, card_id, auth_start, auth_expire,
now_datetime_second(), now_datetime_second()))
table_handler.executemany(
"""
INSERT OR IGNORE INTO householders
(householder_id, name, sex, phone, face_url, face_md5,
type, user_type, card_type, card_id, auth_start, auth_expire,
add_datetime, update_datetime)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", data
)
# householders_type 住户-房产关联表
table_handler.query("SELECT name FROM sqlite_master WHERE type='table' AND name='householders_type'")
if table_handler.cursor.fetchone() is None:
table_handler.execute(
"""
CREATE TABLE householders_type (
householder_id INT,
room_id TEXT,
type TEXT,
update_datetime TEXT,
UNIQUE (householder_id, room_id)
)
"""
)
init_config_path = os.path.join(os.path.dirname(os.path.abspath("__file__")),
"data/InitialData/householders_type.csv")
if os.path.exists(init_config_path):
with open(init_config_path, newline='', encoding='utf8') as csvfile:
csvreader = csv.reader(csvfile)
head = next(csvreader)
data = []
count_info = {}
if len(head) == 3:
for row in csvreader:
householder_id = row[0].strip()
room_id = row[1].strip()
_type = row[2].strip()
if not HouseholdersTable.exists_householder_type(table_handler, householder_id, room_id):
if room_id in count_info.keys():
count_info[room_id] += 1
else:
count_info[room_id] = 1
data.append((int(householder_id), room_id, _type, now_datetime_second()))
table_handler.executemany(
"""
INSERT INTO householders_type
(householder_id, room_id, type, update_datetime)
VALUES (?, ?, ?, ?)
ON CONFLICT (householder_id, room_id) DO NOTHING
""", data
)
for _id, _count in count_info.items():
old_count = HousesTable.get_householder_count(table_handler, _id)
HousesTable.update_householder_count(table_handler, _id, old_count + _count)
@staticmethod
def insert(table_handler: BaseTable, name: str, sex: str, phone: str, _type: str):
try:
table_handler.execute(
"""
INSERT OR IGNORE INTO householders
(name, sex, phone, type, add_datetime, update_datetime)
VALUES (?, ?, ?, ?, ?, ?)
""", (name, sex, encrypt_number(phone), _type, now_datetime_second(), now_datetime_second())
)
finally:
table_handler.query(
"""
SELECT householder_id
FROM householders
WHERE phone = ?
""", (encrypt_number(phone),)
)
res = table_handler.cursor.fetchall()
if res:
return res[0][0]
else:
return None
@staticmethod
def insert_type(table_handler: BaseTable, room_id: str, householder_id: int, _type: str):
table_handler.execute(
"""
INSERT INTO householders_type
(householder_id, room_id, type, update_datetime)
VALUES (?, ?, ?, ?)
ON CONFLICT (householder_id, room_id) DO UPDATE
SET type = ?, update_datetime = ?
""", (householder_id, room_id, _type, now_datetime_second(), _type, now_datetime_second())
)
@staticmethod
def add_auth_user_info(table_handler: BaseTable, obj: AddAuthUserInfo):
"""插入通用人员信息 & 进行通用人员设备授权"""
now_datetime = now_datetime_second()
face_md5 = get_file_md5(f"{FACES_DIR_PATH}/{obj.face_url}")
table_handler.execute(
"""
INSERT OR IGNORE INTO householders
(name, sex, phone, face_url, face_md5, type,
user_type, card_type, card_id,
auth_start, auth_expire, add_datetime, update_datetime)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(obj.name, obj.sex, encrypt_number(obj.phone), obj.face_url, face_md5, "非住户",
obj.user_type, obj.card_type, obj.card_id,
obj.start_datetime, obj.expire_datetime, now_datetime, now_datetime)
)
user_id = HouseholdersTable.get_householder_id(table_handler, obj.phone, '非住户')
failed_ids = []
codes = []
msgs = []
if user_id:
try:
# 人脸下置,设备授权
sc = ServicesCall()
face_item = AddFaceItem(
user_id=user_id,
name=obj.name,
phone_number=obj.phone,
face_url=obj.face_url,
start_date=obj.start_datetime.replace(' ', 'T') + ':00.000Z',
expire_date=obj.expire_datetime.replace(' ', 'T') + ':00.000Z',
device_ids=""
)
for device_id in obj.authorized_devices:
face_item.device_ids = f"[{device_id}]"
_callback, code, msg = sc.add_face(device_id, face_item)
if _callback:
DevicesTable.insert_device_auth_record(table_handler, device_id, face_item.user_id,
face_item.start_date, face_item.expire_date)
else:
failed_ids.append(device_id)
codes.append(code)
msgs.append(msg)
finally:
if 0 < len(failed_ids):
table_handler.execute(
"""
DELETE FROM householders
WHERE householder_id = ?
""", (user_id,)
)
return {"status": False, "message": f"部分设备 - {failed_ids} 人脸授权失败,错误码:{codes}{msgs}"}
return {"status": True}
@staticmethod
def get_householder_id(table_handler: BaseTable, phone: str, _type: str) -> str:
table_handler.query("SELECT householder_id FROM householders WHERE phone=? AND type=?",
(encrypt_number(phone), _type))
res = table_handler.cursor.fetchall()
if res:
return res[0][0]
else:
raise None
@staticmethod
def get_householder_info(table_handler: BaseTable, search_type, search_key, role_type,
page: Optional[int] = None, limit: Optional[int] = None):
sub_sql_list = []
if role_type != "全部":
sub_sql_list.append(f"types like '%{role_type}%'")
if search_type is not None:
if search_type == "name":
sub_sql_list.append(f"name like '%{search_key}%'")
else:
sub_sql_list.append(f"{search_type} = '{search_key}'")
if sub_sql_list:
sub_sql = "WHERE " + " AND ".join(sub_sql_list)
else:
sub_sql = ""
if page is not None and limit is not None:
page_sql = f"LIMIT {limit} OFFSET {(page - 1) * limit}"
else:
page_sql = ""
query_sql = f"""
SELECT * FROM (
SELECT h.householder_id,
h.name,
h.sex,
h.phone,
GROUP_CONCAT(houses.house_name) AS rooms,
GROUP_CONCAT(hh.type) AS types,
h.add_datetime
FROM householders h
LEFT JOIN householders_type hh ON h.householder_id = hh.householder_id
LEFT JOIN houses ON hh.room_id = houses.room_id
WHERE h.type = '住户'
GROUP BY h.householder_id, h.name, h.sex, h.phone, h.add_datetime
) {sub_sql}
"""
table_handler.query(f"SELECT COUNT(*) FROM ({query_sql})")
total = table_handler.cursor.fetchall()[0][0]
table_handler.query(f"{query_sql} {page_sql}")
res = table_handler.cursor.fetchall()
if res:
householder_info = []
for i in res:
householder_info.append(HouseholdersInfo(
householder_id=i[0],
name=i[1],
sex=i[2],
phone=decrypt_number(i[3]),
rooms=i[4],
add_datetime=i[6]
).__dict__)
return {"info": householder_info, "total": total}
else:
return {"info": [], "total": total}
@staticmethod
def get_auth_householder_info(table_handler: BaseTable,
search_type: Optional[str], search_key: Optional[str],
space_type: Optional[str], space_id: Optional[str],
page: Optional[int] = None, limit: Optional[int] = None):
sub_sql_list = []
if search_type is not None and search_key is not None:
if search_type == "name":
sub_sql_list.append(f"{search_type} like '%{search_key}%'")
else:
sub_sql_list.append(f"{search_type} = '{search_key}'")
if space_type is not None and space_id is not None:
space_sub_sql = f"""
AND h.householder_id IN (
SELECT DISTINCT h1.householder_id
FROM householders h1
LEFT JOIN householders_type hh1 ON h1.householder_id = hh1.householder_id
LEFT JOIN houses houses1 ON hh1.room_id = houses1.room_id
WHERE houses1.{space_type} = '{space_id}'
)
"""
else:
space_sub_sql = ""
if sub_sql_list:
sub_sql = "WHERE " + " AND ".join(sub_sql_list)
else:
sub_sql = ""
if page is not None and limit is not None:
page_sql = f"LIMIT {limit} OFFSET {(page - 1) * limit}"
else:
page_sql = ""
query_sql = f"""
SELECT * FROM (
SELECT h.householder_id, h.name, h.phone,
GROUP_CONCAT(houses.house_name || '' || hh.type || '') AS rooms
FROM householders h
LEFT JOIN householders_type hh ON h.householder_id = hh.householder_id
LEFT JOIN houses ON hh.room_id = houses.room_id
WHERE h.type = '住户' {space_sub_sql}
GROUP BY h.householder_id, h.name, h.phone
) {sub_sql}
"""
table_handler.query(f"SELECT COUNT(*) FROM ({query_sql})")
total = table_handler.cursor.fetchall()[0][0]
table_handler.query(f"{query_sql} {page_sql}")
res = table_handler.cursor.fetchall()
if res:
householder_info = []
for i in res:
householder_info.append(AuthHouseholdersInfo(
householder_id=i[0],
name=i[1],
phone=decrypt_number(i[2]),
rooms=i[3]
).__dict__)
return {"info": householder_info, "total": total}
else:
return {"info": [], "total": total}
@staticmethod
def get_auth_users_info(table_handler: BaseTable,
search_type: Optional[str], search_key: Optional[str], user_type: Optional[str],
page: Optional[int] = None, limit: Optional[int] = None):
sub_sql_list = []
if search_type is not None:
if search_type == "name":
sub_sql_list.append(f"{search_type} like '%{search_key}%'")
else:
sub_sql_list.append(f"{search_type} = '{search_key}'")
if user_type is not None:
sub_sql_list.append(f"user_type = '{user_type}'")
if sub_sql_list:
sub_sql = "AND " + " AND ".join(sub_sql_list)
else:
sub_sql = ""
if page is not None and limit is not None:
page_sql = f"LIMIT {limit} OFFSET {(page - 1) * limit}"
else:
page_sql = ""
query_sql = f"""
SELECT householder_id, name, sex, phone, card_type, card_id, user_type, auth_start, auth_expire
FROM householders
WHERE type != '住户' {sub_sql}
"""
table_handler.query(f"SELECT COUNT(*) FROM ({query_sql})")
total = table_handler.cursor.fetchall()[0][0]
table_handler.query(f"{query_sql} {page_sql}")
res = table_handler.cursor.fetchall()
if res:
householder_info = []
for i in res:
householder_info.append(AuthUsersInfo(
user_id=i[0], name=i[1], sex=i[2], phone=decrypt_number(i[3]),
card_type=i[4], card_id=i[5], user_type=i[6],
start_datetime=i[7], expire_datetime=i[8]
).__dict__)
return {"info": householder_info, "total": total}
else:
return {"info": [], "total": total}
@staticmethod
def get_auth_user_info(table_handler: BaseTable, householder_id: int):
query_sql = """
SELECT householder_id, name, sex, phone, face_url, card_type, card_id, user_type, auth_start, auth_expire
FROM householders
WHERE type != '住户' AND householder_id = ?
"""
table_handler.query(query_sql, (householder_id,))
res = table_handler.cursor.fetchall()
if res:
householder_info = AuthUsersDetailInfo(
user_id=res[0][0], name=res[0][1], sex=res[0][2], phone=decrypt_number(res[0][3]),
face_url=f"http://{HOST_IP}:{PORT}{PREFIX_PATH}/{SUB_PATH}/{res[0][4]}" if res[0][4] else '',
card_type=res[0][5], card_id=res[0][6], user_type=res[0][7],
start_datetime=res[0][8], expire_datetime=res[0][9],
authorized_devices=DevicesTable.get_auth_device_info(table_handler, str(res[0][0]))
).__dict__
return householder_info
else:
raise InvalidException(f"无效ID {householder_id}")
@staticmethod
def get_room_ids(table_handler: BaseTable, householder_id):
"""获取住户的所有房产ID"""
table_handler.query(
"""
SELECT room_id
FROM householders_type
WHERE householder_id = ?
""", (householder_id,)
)
res = table_handler.cursor.fetchall()
if res:
return [i[0] for i in res]
else:
return []
@staticmethod
def get_rooms_by_householder_id(table_handler: BaseTable, householder_id):
"""获取住户的所有房产ID"""
table_handler.query(
"""
SELECT h.house_name, type, ht.room_id
FROM householders_type ht
LEFT JOIN houses h ON h.room_id = ht.room_id
WHERE householder_id = ?
""", (householder_id,)
)
res = table_handler.cursor.fetchall()
if res:
return [list(i) for i in res]
else:
return []
@staticmethod
def get_householder_np_by_id(table_handler: BaseTable, householder_id: int):
table_handler.query(
"""
SELECT name, phone
FROM householders
WHERE householder_id = ?
""", (householder_id,)
)
res = table_handler.cursor.fetchall()
if res:
return {"name": res[0][0], "phone": decrypt_number(res[0][1])}
else:
raise InvalidException(f"无效ID {householder_id}")
@staticmethod
def get_face_url(table_handler: BaseTable, householder_id: int):
table_handler.query(
"""
SELECT DISTINCT face_url
FROM householders
WHERE householder_id = ?
""", (householder_id,)
)
res = table_handler.cursor.fetchall()
if res:
return res[0][0]
else:
return ""
@staticmethod
def get_face_md5(table_handler: BaseTable, householder_id: int):
table_handler.query(
"""
SELECT DISTINCT face_md5
FROM householders
WHERE householder_id = ?
""", (householder_id,)
)
res = table_handler.cursor.fetchall()
if res:
return res[0][0]
else:
return ""
@staticmethod
def get_householder_info_by_id(table_handler: BaseTable, householder_id: int):
table_handler.query(
"""
SELECT name, sex, phone
FROM householders
WHERE householder_id = ?
""", (householder_id,)
)
res = table_handler.cursor.fetchall()
if res:
householder_info = {
"householder_id": householder_id,
"name": res[0][0],
"sex": res[0][1],
"phone": decrypt_number(res[0][2]),
"property_info": []
}
table_handler.query(
"""
SELECT householders_type.room_id, house_name, area_name, building_name, unit_name,
householders_type.type, householder_count
FROM householders_type
LEFT JOIN houses ON houses.room_id = householders_type.room_id
WHERE householder_id = ?
""", (householder_id,)
)
res = table_handler.cursor.fetchall()
if res:
for i in res:
householder_info["property_info"].append({
"room_id": i[0],
"room_name": i[1],
"area_name": i[2] if i[2] else "",
"building_name": i[3] if i[3] else "",
"unit_name": i[4] if i[4] else "",
"type": i[5],
"household_count": i[6]
})
return householder_info
return {}
@staticmethod
def get_auth_householder_detail_info(table_handler: BaseTable, householder_id: int):
info = HouseholdersTable.get_householder_info_by_id(table_handler, householder_id)
rooms = HouseholdersTable.get_rooms_by_householder_id(table_handler, info["householder_id"])
face_url = HouseholdersTable.get_face_url(table_handler, householder_id)
device_ids = DevicesTable.get_auth_device_ids(table_handler, str(householder_id))
authorized_devices = []
for device_id in device_ids:
start_date, expire_date = DevicesTable.get_auth_interval(table_handler, str(householder_id), device_id)
authorized_devices.append({
"device_name": DevicesTable.get_device_name(table_handler, device_id),
"device_type": DevicesTable.get_device_scope_type(table_handler, device_id),
"open_type": ["人脸"],
"start_date": start_date,
"expire_date": expire_date,
"method": "自动授权",
"from": "系统"
})
resp = HouseholderDetailInfo(
householder_id=info["householder_id"],
name=info["name"],
phone=info["phone"],
face_url=f"http://{HOST_IP}:{PORT}{PREFIX_PATH}/{SUB_PATH}/{face_url}",
rooms=rooms,
authorized_devices=authorized_devices
)
return resp.__dict__
@staticmethod
def update_householder_info(table_handler: BaseTable,
householder_id: int,
property_info: List[tuple[str, str]]):
# 查询对应住户关联的房产信息,比对更新数据中的房产信息
update_room_ids = [i[0] for i in property_info]
update_types = [i[1] for i in property_info]
table_handler.query(
"""
SELECT room_id, type
FROM householders_type
WHERE householder_id = ?
""", (householder_id,)
)
res = table_handler.cursor.fetchall()
if res:
update_data = []
delete_data = []
insert_data = [[v, update_types[i]] for i, v in enumerate(update_room_ids)]
sqls = []
params = []
update_count = []
for i in res:
try:
index = update_room_ids.index(i[0])
if update_types[index] != i[1]:
update_data.append([i[0], update_types[index]])
insert_data.remove([i[0], update_types[index]])
except ValueError:
delete_data.append(i[0])
table_handler.query(
"""
SELECT name, phone, face_url, auth_start, auth_expire
FROM householders
WHERE householder_id = ?
""", (householder_id,)
)
householder_info = table_handler.cursor.fetchall()[0]
sc = ServicesCall()
face_item = AddFaceItem(
user_id=householder_id,
name=householder_info[0],
phone_number=householder_info[1],
face_url=householder_info[2],
start_date=householder_info[3].replace(' ', 'T') + ':00.000Z',
expire_date=householder_info[4].replace(' ', 'T') + ':00.000Z',
device_ids=""
)
if update_data:
for data in update_data:
sqls.append(
"""
UPDATE householders_type
SET type = ?, update_datetime = ?
WHERE householder_id = ? AND room_id = ?
"""
)
params.append((data[1], now_datetime_second(), householder_id, data[0]))
if insert_data:
for data in insert_data:
sqls.append(
"""
INSERT INTO householders_type
(room_id, householder_id, type, update_datetime)
VALUES (?, ?, ?, ?)
"""
)
params.append((data[0], householder_id, data[1], now_datetime_second()))
old_count = HousesTable.get_householder_count(table_handler, data[0])
update_count.append([data[0], old_count + 1])
# TODO 同步增加对应设备中的人脸信息
unit_id = HousesTable.get_unit_id_by_room_id(table_handler, data[0])
device_ids = DevicesTable.get_device_ids_by_unit_id(table_handler, unit_id)
for device_id in device_ids:
face_item.device_ids = f"[{device_id}]"
_callback, code, msg = sc.add_face(device_id, face_item)
if _callback:
DevicesTable.insert_device_auth_record(table_handler, device_id, face_item.user_id,
face_item.start_date, face_item.expire_date)
logger.Logger.error(f"[update_householder_info] "
f"住户[{householder_id}]人脸增加失败,失败代码:{code},提示信息:{msg}")
if delete_data:
for room_id in delete_data:
sqls.append(
"""
DELETE FROM householders_type
WHERE householder_id = ? AND room_id = ?
"""
)
params.append((householder_id, room_id))
old_count = HousesTable.get_householder_count(table_handler, room_id)
update_count.append([room_id, old_count - 1])
# TODO 获取房产-单元 同步移除对应设备中的人脸
unit_id = HousesTable.get_unit_id_by_room_id(table_handler, room_id)
device_ids = DevicesTable.get_device_ids_by_unit_id(table_handler, unit_id)
for device_id in device_ids:
_callback, code, msg = sc.del_face(device_id,
DelFaceItem(user_id=householder_id, device_ids=f"[{device_id}]"))
if _callback:
DevicesTable.delete_invalid_auth_record(table_handler, device_id, str(householder_id))
logger.Logger.error(f"[update_householder_info] "
f"住户[{householder_id}]人脸移除失败,失败代码:{code},提示信息:{msg}")
if len(update_room_ids) == 0:
sqls.append(
"""
DELETE FROM householders
WHERE householder_id = ?
"""
)
params.append((householder_id,))
table_handler.execute(sqls, params)
for i in update_count:
HousesTable.update_householder_count(table_handler, i[0], i[1])
return {"status": True}
else:
return {"status": False, "message": "不存在对应住户或住户名下无关联房产"}
@staticmethod
def update_auth_user_info(table_handler: BaseTable, obj: UpdateAuthUserInfo):
# 通用人员授权信息更新
nd = now_datetime_second().replace(' ', 'T') + '.000Z'
sd = obj.start_datetime.replace(' ', 'T') + ':00.000Z'
ed = obj.expire_datetime.replace(' ', 'T') + ':00.000Z'
sc = ServicesCall()
face_item = AddFaceItem(
user_id=obj.user_id,
name=obj.name,
phone_number=obj.phone,
face_url=obj.face_url,
start_date=sd,
expire_date=ed,
device_ids=""
)
old_face_url = HouseholdersTable.get_face_url(table_handler, obj.user_id)
# 查询当前人员授权情况
table_handler.query(
"""
SELECT device_id, start_date, expire_date
FROM devices_auth
WHERE _id = ?
""", (obj.user_id,)
)
res = table_handler.cursor.fetchall()
if res:
delete_data = [] # 移除权限
insert_data = [[device_id, sd, ed] for device_id in obj.authorized_devices] # 新增权限
for i in res:
if old_face_url == obj.face_url: # 对比人脸图片是否变更
# 人脸无实际变更,根据权限变更重新赋权
logger.Logger.debug("新旧人脸一致,按需赋权")
if [i[0], sd, ed] in insert_data and ed == i[2] and (sd < nd or sd == i[1]):
insert_data.remove([i[0], sd, ed])
else:
delete_data.append(i[0])
else:
# 若人脸变更直接移除所有人脸授权,重新赋权
delete_data.append(i[0])
delete_failed_ids = []
delete_failed_codes = []
delete_failed_msgs = []
message = ""
if delete_data:
for device_id in delete_data:
_callback, code, msg = sc.del_face(device_id, DelFaceItem(
user_id=obj.user_id, device_ids=f"[{device_id}]"))
if _callback:
DevicesTable.delete_invalid_auth_record(table_handler, device_id, face_item.user_id)
else:
delete_failed_ids.append(device_id)
delete_failed_codes.append(code)
delete_failed_msgs.append(msg)
if 0 < len(delete_failed_ids) < len(delete_data):
message += f"部分设备 - {delete_failed_ids} 人脸移除失败,错误码:{delete_failed_codes}, {delete_failed_msgs}"
elif len(delete_failed_ids) == len(delete_data) != 0:
raise InvalidException("设备授权无法更新:人脸移除失败")
insert_failed_ids = []
insert_failed_codes = []
if insert_data:
for data in insert_data:
face_item.device_ids = f"[{data[0]}]"
_callback, code, msg = sc.add_face(data[0], face_item)
if _callback:
DevicesTable.insert_device_auth_record(table_handler, data[0], face_item.user_id,
face_item.start_date, face_item.expire_date)
else:
insert_failed_ids.append(data[0])
insert_failed_codes.append(code)
if 0 < len(insert_failed_ids) < len(insert_data):
message += f"部分设备 - {delete_failed_ids} 人脸授权失败,错误码:{delete_failed_codes}"
elif len(delete_failed_ids) == len(delete_data) != 0:
raise InvalidException("设备授权无法更新:人脸下发失败")
if message != '':
return InvalidException(message)
else:
table_handler.execute(
"""
UPDATE householders
SET name = ?, sex = ?, phone= ?,
face_url = ?, face_md5= ?,
user_type = ?, card_type = ?, card_id = ?,
auth_start = ?, auth_expire = ?, update_datetime = ?
WHERE householder_id = ?
""", (obj.name, obj.sex, encrypt_number(obj.phone),
obj.face_url, get_file_md5(f"{FACES_DIR_PATH}/{obj.face_url}"),
obj.user_type, obj.card_type, obj.card_id,
sd, ed, now_datetime_second(), obj.user_id)
)
return {"status": True}
else:
return {"status": False, "message": "不存在对应ID的授权"}
@staticmethod
def update_householder_face(table_handler: BaseTable, householder_id: int, face_url: str):
"""更新住户信息中的人脸地址 & 人脸地址同步到权限内的设备"""
new_md5 = get_file_md5(f"{FACES_DIR_PATH}/{face_url}")
if new_md5 != HouseholdersTable.get_face_md5(table_handler, householder_id):
table_handler.execute(
"""
UPDATE householders
SET face_url = ?, update_datetime = ? WHERE householder_id = ?
""", (face_url, now_datetime_second(), householder_id)
)
# 查询住户关联单元,获取单元权限内的设备,下发人脸
table_handler.query(
"""
SELECT h.householder_id, name, phone, GROUP_CONCAT(unit_id) as unit_ids
FROM householders h
LEFT JOIN householders_type ht ON ht.householder_id = h.householder_id
LEFT JOIN houses ON ht.room_id = houses.room_id
WHERE h.type = '住户' AND h.householder_id = ?
""", (householder_id,)
)
res = table_handler.cursor.fetchall()
if res:
sc = ServicesCall()
face_item = AddFaceItem(
user_id=res[0][0],
name=res[0][1],
phone_number=decrypt_number(res[0][2]),
face_url=face_url,
device_ids=""
)
for unit_id in res[0][3].split(','):
device_ids = DevicesTable.get_device_ids_by_unit_id(table_handler, unit_id)
for device_id in device_ids:
face_item.device_ids = f"[{device_id}]"
_callback, code, msg = sc.add_face(device_id, face_item)
if _callback:
DevicesTable.insert_device_auth_record(table_handler, device_id, face_item.user_id,
face_item.start_date, face_item.expire_date)
return {"status": True}
else:
logger.Logger.debug("新旧人脸一致,不做任何变更")
return {"status": True}
@staticmethod
def update_user_info(table_handler: BaseTable, householder_id: int, face_url: str):
"""更新通用人员信息中的人脸地址 & 人脸地址同步到下发过的设备中"""
table_handler.execute(
"""
UPDATE householders
SET face_url = ? WHERE householder_id = ?
""", (face_url, householder_id)
)
# 查询人员授权过的设备,下发人脸
device_ids = DevicesTable.get_auth_device_ids(table_handler, str(householder_id))
name_phone = HouseholdersTable.get_householder_np_by_id(table_handler, householder_id)
sc = ServicesCall()
face_item = AddFaceItem(
user_id=householder_id,
name=name_phone["name"],
phone_number=name_phone["phone"],
face_url=face_url,
device_ids=""
)
for device_id in device_ids:
face_item.device_ids = f"[{device_id}]"
_callback, code, _ = sc.add_face(device_id, face_item)
if _callback:
DevicesTable.insert_device_auth_record(table_handler, device_id, face_item.user_id,
face_item.start_date, face_item.expire_date)
return {"status": True}
@staticmethod
def delete_householder_info(table_handler: BaseTable, householder_id: int):
# 确认住户ID有效
if not HouseholdersTable.exists_householder_id(table_handler, householder_id):
raise InvalidException("不存在对应住户ID")
# 1. 查询当前住户授权的授权的设备ID
device_ids = DevicesTable.get_auth_device_ids(table_handler, str(householder_id))
# 2. 对所有授权设备进行授权移除操作
if device_ids:
sc = ServicesCall()
_callback, code, msg = sc.del_face(device_ids[0], DelFaceItem(
user_id=str(householder_id),
device_ids=str(device_ids)
))
if not _callback:
raise InvalidException(f"住户人脸授权移除失败,错误码{code}{msg}")
DevicesTable.delete_householder_all_auth(table_handler, str(householder_id))
# 3. 同步更新住户名下房产入住人数,并移除住户记录
room_ids = HouseholdersTable.get_room_ids(table_handler, householder_id)
for room_id in room_ids:
old_count = HousesTable.get_householder_count(table_handler, room_id)
HousesTable.update_householder_count(table_handler, room_id, old_count - 1)
sqls = [
"DELETE FROM householders WHERE householder_id = ?",
"DELETE FROM householders_type WHERE householder_id = ?"
]
table_handler.execute(sqls, [(householder_id,), (householder_id,)])
return {"status": True}
@staticmethod
def delete_auth_user_info(table_handler: BaseTable, user_id: int):
# 确认通用用户ID有效
if not HouseholdersTable.exists_user_id(table_handler, user_id):
raise InvalidException("不存在对应通用用户ID")
# 1. 查询当前住户授权的授权的设备ID
device_ids = DevicesTable.get_auth_device_ids(table_handler, str(user_id))
# 2. 对所有授权设备进行授权移除操作
if device_ids:
sc = ServicesCall()
_callback, code, msg = sc.del_face(device_ids[0], DelFaceItem(
user_id=str(user_id),
device_ids=str(device_ids)
))
if not _callback:
raise InvalidException(f"住户人脸授权移除失败,错误码{code}{msg}")
DevicesTable.delete_householder_all_auth(table_handler, str(user_id))
sqls = [
"DELETE FROM householders WHERE householder_id = ?",
"DELETE FROM householders_type WHERE householder_id = ?"
]
table_handler.execute(sqls, [(user_id,), (user_id,)])
return {"status": True}
@staticmethod
def delete_type(table_handler: BaseTable, householder_id: int):
table_handler.execute(
"""
DELETE FROM householders_type
WHERE householder_id = ?
""", (householder_id,)
)
@staticmethod
def exists_householder_phone(table_handler: BaseTable, phone: str) -> bool:
"""查询住户手机号码是否存在"""
table_handler.query(
"""
SELECT householder_id
FROM householders
WHERE type = '住户' AND phone = ?
""", (encrypt_number(phone),)
)
res = table_handler.cursor.fetchall()
if res:
return True
else:
return False
@staticmethod
def exists_user_phone(table_handler: BaseTable, phone: str):
"""查询非住户手机号码是否存在"""
table_handler.query(
"""
SELECT householder_id
FROM householders
WHERE type != '住户' AND phone = ?
""", (encrypt_number(phone),)
)
res = table_handler.cursor.fetchall()
if res:
return True
else:
return False
@staticmethod
def exists_id(table_handler: BaseTable, householder_id: int) -> bool:
"""查询ID是否存在"""
table_handler.query(
"""
SELECT householder_id
FROM householders
WHERE householder_id = ?
""",
(householder_id,)
)
res = table_handler.cursor.fetchall()
if res:
return True
else:
return False
@staticmethod
def exists_householder_id(table_handler: BaseTable, householder_id: int) -> bool:
"""查询住户ID是否存在"""
table_handler.query(
"""
SELECT householder_id
FROM householders
WHERE type = '住户' AND householder_id = ?
""",
(householder_id,)
)
res = table_handler.cursor.fetchall()
if res:
return True
else:
return False
@staticmethod
def exists_user_id(table_handler: BaseTable, user_id: int) -> bool:
"""查询非住户ID是否存在"""
table_handler.query(
"""
SELECT householder_id
FROM householders
WHERE type != '住户' AND householder_id = ?
""",
(user_id,)
)
res = table_handler.cursor.fetchall()
if res:
return True
else:
return False
@staticmethod
def exists_householder_type(table_handler: BaseTable, householder_id: str, room_id: str) -> bool:
"""查询住户-房产关联信息是否存在"""
table_handler.query(
"""
SELECT householder_id
FROM householders_type
WHERE householder_id = ? AND room_id = ?
""",
(householder_id, room_id)
)
res = table_handler.cursor.fetchall()
if res:
return True
else:
return False