# -*- coding:utf-8 -*- """ @Author : xuxingchen @Contact : xuxingchen@sinochem.com @Desc : 子系统房产信息表 """ import csv import json import os import time from typing import Optional from pydantic import BaseModel, Field, field_validator from models.spaces import SpacesTable from utils.database import BaseTable, get_table_handler from utils.misc import now_datetime_second, InvalidException, generate_captcha_text class HouseDetailInfo(BaseModel): project_id: str subsystem_id: str subsystem_name: str subsystem_data: str createby_id: str subsystem_extend: dict create_time: Optional[int] corp_id: Optional[str] message_id: Optional[str] = Field(alias="_id") action: str = "insert" house_id: Optional[str] = None house_name: Optional[str] = None @field_validator("subsystem_extend") def check_subsystem_extend(cls, value): if value.get("label", None) is None: raise InvalidException("subsystem_extend 中 label 值缺失") return value # @field_validator("house_id") # def check_house_id(cls, value, values): # th = get_table_handler() # if value is None or value == "": # label = values.data.get("subsystem_extend")["label"] # space_info = SpacesTable.get_space_info_by_sub_space_name(th, label) # if space_info is None: # raise InvalidException(f"subsystem_extend - label:{label} 不在映射表中") # if SubsystemTable.exits_room_id(th, space_info["room_id"]): # raise InvalidException(f"house_id:{space_info['room_id']} 记录已存在,禁止插入操作") # return space_info["room_id"] # if not SpacesTable.exits_room_id(th, value): # raise InvalidException("映射表中不存在对应的 house_id") # else: # return value # @field_validator("house_name") # def check_house_name(cls, value, values): # th = get_table_handler() # if value is None: # label = values.data.get("subsystem_extend")["label"] # space_info = SpacesTable.get_space_info_by_sub_space_name(th, label) # if space_info is None: # raise InvalidException(f"subsystem_extend - label:{label} 不在映射表中") # if SubsystemTable.exits_room_id(th, space_info["room_id"]): # raise InvalidException(f"house_id:{space_info['room_id']} 记录已存在,禁止插入操作") # return space_info["house_name"] # return value class SubsystemTable(BaseTable): @staticmethod def check(table_handler: BaseTable): """检测是否存在当前表""" table_handler.query("SELECT name FROM sqlite_master WHERE type='table' AND name='subsystem'") if table_handler.cursor.fetchone() is None: table_handler.execute( f""" CREATE TABLE subsystem ( room_id TEXT UNIQUE, subsystem_id TEXT, subsystem_project_id TEXT, sub_space_name TEXT, subsystem_name TEXT, subsystem_data TEXT, subsystem_extend TEXT, createby_id TEXT, corp_id TEXT, _id TEXT, create_time INTEGER, update_time INTEGER, update_datetime TEXT, PRIMARY KEY (subsystem_id, subsystem_project_id, sub_space_name) ) """ ) init_config_path = os.path.join(os.path.dirname(os.path.abspath("__file__")), "data/InitialData/subsystem.csv") if os.path.exists(init_config_path): with open(init_config_path, newline='', encoding='utf8') as csvfile: csvreader = csv.reader(csvfile) head = next(csvreader) if len(head) == 10: data = [] for row in csvreader: room_id = row[0].strip() subsystem_project_id = row[1].strip() sub_space_name = row[2].strip() subsystem_name = row[3].strip() subsystem_data = row[4].strip() subsystem_extend = row[5].strip() createby_id = row[6].strip() corp_id = row[7].strip() _id = row[8].strip() create_time = int(row[9].strip()) update_datetime = now_datetime_second() data.append((room_id, subsystem_project_id, sub_space_name, subsystem_name, subsystem_data, subsystem_extend, createby_id, corp_id, _id, create_time, int(time.time() * 1000), update_datetime)) table_handler.executemany( f""" INSERT INTO subsystem (room_id, subsystem_project_id, sub_space_name, subsystem_name, subsystem_data, subsystem_extend, createby_id, corp_id, _id, create_time, update_time, update_datetime) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT (room_id) DO NOTHING """, data ) @staticmethod def get_house_detail_info_by_project_id(table_handler: BaseTable, project_id: str) -> list: """根据项目ID获取查找子系统空间结构中的房间信息""" table_handler.query( """ SELECT subsystem.room_id, sub_space_name, house_name, subsystem_project_id, subsystem_id, subsystem_name, subsystem_data, subsystem_extend, createby_id, corp_id, _id, create_time, update_time FROM subsystem LEFT JOIN houses ON subsystem.room_id = houses.room_id WHERE subsystem_project_id = ? ORDER BY create_time DESC """, (project_id,) ) res = table_handler.cursor.fetchall() house_detail_info_list = [] if res: for info in res: subsystem_extend = json.loads(info[7].replace("'", '"').replace("None", "null")) subsystem_extend["label"] = info[1] house_detail_info_list.append({ "house_id": info[0] if info[0] else "", "house_name": info[2] if info[0] else "", "project_id": info[3], "subsystem_id": info[4], "subsystem_name": info[5], "subsystem_data": info[6], "subsystem_extend": subsystem_extend, "createby_id": info[8], "corp_id": info[9], "_id": generate_captcha_text(16).lower(), "create_time": info[11], "update_time": info[12], "is_stop": False, "valid": True, "ids": None }) return house_detail_info_list @staticmethod def add_sub_system_house_info(table_handler: BaseTable, obj: HouseDetailInfo): """添加房产信息""" nt = now_datetime_second() if obj.action == "insert": table_handler.execute( """ INSERT OR IGNORE INTO subsystem (subsystem_id, subsystem_project_id, sub_space_name, subsystem_name, subsystem_data, subsystem_extend, createby_id, corp_id, _id, create_time, update_time, update_datetime) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, (obj.subsystem_id, obj.project_id, obj.subsystem_extend["label"], obj.subsystem_name, obj.subsystem_data, str(obj.subsystem_extend), obj.createby_id, obj.corp_id, obj.message_id, obj.create_time, int(time.time() * 1000), nt) ) else: table_handler.execute( """ UPDATE subsystem SET subsystem_id = ?, subsystem_project_id = ?, sub_space_name = ?, subsystem_name = ?, subsystem_data = ?, subsystem_extend = ?, createby_id = ?, corp_id = ?, _id = ?, create_time = ?, update_time = ?, update_datetime = ? WHERE room_id = ? """, (obj.subsystem_id, obj.project_id, obj.subsystem_extend["label"], obj.subsystem_name, obj.subsystem_data, str(obj.subsystem_extend), obj.createby_id, obj.corp_id, obj.message_id, obj.create_time, int(time.time() * 1000), nt, obj.house_id) ) @staticmethod def exits_room_id(table_handler: BaseTable, room_id: str): table_handler.query("SELECT room_id FROM subsystem WHERE room_id = ?", (room_id,)) if table_handler.cursor.fetchone() is not None: return True else: return False