# -*- coding:utf-8 -*- """ @Author : xuxingchen @Contact : xuxingchen@sinochem.com @Desc : 房产信息表查&改 """ import csv import json import os import traceback from utils import logger from utils.database import BaseTable from utils.misc import now_datetime_second, extract_fixed_length_number class HousesTable(BaseTable): @staticmethod def check(table_handler: BaseTable): """检测是否存在当前表""" table_handler.query("SELECT name FROM sqlite_master WHERE type='table' AND name='houses'") if table_handler.cursor.fetchone() is None: table_handler.execute( f""" CREATE TABLE houses ( room_id TEXT UNIQUE, house_name TEXT UNIQUE, project_id TEXT, project_name TEXT, area_id TEXT, area_name TEXT, building_id TEXT, building_name TEXT, unit_id TEXT, unit_name TEXT, floor_id TEXT, floor_name TEXT, room_name TEXT, householder_count INTEGER, update_datetime TEXT ) """ ) # 根据导出数据生成内置数据 init_config_path = os.path.join(os.path.dirname(os.path.abspath("__file__")), "data/InitialData/houses.csv") csv_file = open(init_config_path, "w", encoding="utf8") csv_file.write( "房屋ID, 房屋编号, 项目ID, 所属项目, 区域ID, 所属区域, 楼栋ID, 所属楼栋, 单元ID, 所属单元, 楼层ID, 所属楼层, 房间号, 住户数量\n") room_info_path = os.path.join(os.path.dirname(os.path.abspath("__file__")), "data/ExportData/room.txt") if os.path.exists(room_info_path): with open(room_info_path, "r", encoding="utf8") as f: file_content = f.read() file_content = file_content.replace("ISODate(", "").replace(")", "").replace("'", "\"") room_info = json.loads(file_content) for item in room_info: room_id = item.get("_id", "") project_id = item["project_id"] project = item["project_name"] area_id = item.get("area_id", "") area = item.get("area_name", "") building_id = item.get("build_id", "") building = item.get("building_name", "") unit_id = item.get("unit_id", "") unit = item.get("unit_name", "") floor_id = item.get("floor_id", "") floor = item.get("floor_name", "") room = item.get("num", "") wait_write = [room_id, '-'.join([i for i in [project, area, building, unit, floor, room] if i != '']), project_id, project, area_id, area, building_id, building, unit_id, unit, floor_id, floor, room, str(len(item.get("households", [])))] csv_file.write(', '.join(wait_write) + "\n") csv_file.flush() csv_file.close() if os.path.exists(init_config_path): with open(init_config_path, newline='', encoding='utf8') as csvfile: csvreader = csv.reader(csvfile) head = next(csvreader) data = [] if len(head) == 14: for row in csvreader: room_id = row[0].strip() house_name = row[1].strip() project_id = row[2].strip() project_name = row[3].strip() area_id = row[4].strip() if row[4].strip() else None area_name = row[5].strip() if row[5].strip() else None building_id = row[6].strip() if row[6].strip() else None building_name = row[7].strip() if row[7].strip() else None unit_id = row[8].strip() if row[8].strip() else None unit_name = row[9].strip() if row[9].strip() else None floor_id = row[10].strip() if row[10].strip() else None floor_name = row[11].strip() if row[11].strip() else None room_name = row[12].strip() householder_count = int(row[13].strip()) update_datetime = now_datetime_second() data.append((room_id, house_name, project_id, project_name, area_id, area_name, building_id, building_name, unit_id, unit_name, floor_id, floor_name, room_name, householder_count, update_datetime)) table_handler.executemany( f""" INSERT INTO houses (room_id, house_name, project_id, project_name, area_id, area_name, building_id, building_name, unit_id, unit_name, floor_id, floor_name, room_name, householder_count, update_datetime) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT (room_id) DO NOTHING """, data ) @staticmethod def get_house_detail_info(table_handler: BaseTable, room_id): """根据房屋ID获取房产详细信息""" table_handler.query( """ SELECT room_id, project_id, room_name, project_name FROM houses WHERE room_id = ? """, (room_id,) ) res = table_handler.cursor.fetchall() if res: return { "id": res[0][0], "project_id": res[0][1], "name": res[0][2], "type": 1, "subtype": 0, "project_name": res[0][3] } else: return None @staticmethod def get_unit_id_by_room_id(table_handler: BaseTable, room_id): """根据房屋ID获取房产详细信息""" table_handler.query( """ SELECT unit_id FROM houses WHERE room_id = ? """, (room_id,) ) res = table_handler.cursor.fetchall() if res: return res[0][0] else: return None @staticmethod def get_house_ty_info(table_handler: BaseTable, project_id: str, room_id: str) -> dict | None: """查询房产真实信息""" table_handler.query( """ SELECT houses.room_id, house_name, project_id, corp_id, building_name, unit_name, floor_name, room_name FROM houses LEFT JOIN subsystem ON houses.room_id = subsystem.room_id WHERE project_id = ? AND houses.room_id = ? """, (project_id, room_id) ) res = table_handler.cursor.fetchall() if res: building = extract_fixed_length_number(res[0][4]) unit = extract_fixed_length_number(res[0][5]) floor = extract_fixed_length_number(res[0][6]) room = extract_fixed_length_number(res[0][7]) ty_house_id = f"{building}{unit}{floor}{room}" return { "id": res[0][0], "house_name": res[0][1], "ty_house_id": ty_house_id, "project_id": res[0][2], "corp_id": res[0][3] } else: return None @staticmethod def get_items_by_dynamic_item(table_handler: BaseTable, query_target: str, query_item: dict): sub_sql_list = [] for key, value in query_item.items(): if isinstance(value, int): sub_sql_list.append(f"{key}_id = {value}") else: sub_sql_list.append(f"{key}_id = '{value}'") sub_sql = "" if len(sub_sql_list) == 0 else "WHERE " + " and ".join(sub_sql_list) try: table_handler.query( f""" SELECT DISTINCT {query_target}_id, {query_target}_name FROM houses {sub_sql} """ ) res = table_handler.cursor.fetchall() if res: item_list = [{"_id": i[0], "_name": i[1]} for i in res if i[0] is not None] return item_list else: return [] except Exception as e: logger.Logger.error(f"{type(e).__name__}, {e}") if logger.DEBUG: traceback.print_exc() return [] @staticmethod def get_householder_count(table_handler: BaseTable, room_id: str): table_handler.query( """ SELECT householder_count FROM houses WHERE room_id = ? """, (room_id,), ) res = table_handler.cursor.fetchall() if res: return res[0][0] else: return None @staticmethod def update_householder_count(table_handler: BaseTable, room_id: str, householder_count: int): table_handler.execute( """ UPDATE houses SET householder_count=?, update_datetime=? WHERE room_id=? """, ( householder_count, now_datetime_second(), room_id ), ) return True @staticmethod def get_unit_ids(table_handler: BaseTable, building_id: str): table_handler.execute( """ SELECT DISTINCT unit_id FROM houses WHERE building_id = ? """, (building_id,) ) res = table_handler.cursor.fetchall() if res: return [i[0] for i in res] else: return [] @staticmethod def exists(table_handler: BaseTable, room_id: str): table_handler.query( """ SELECT house_name FROM houses WHERE room_id = ? """, (room_id,) ) res = table_handler.cursor.fetchall() if res: return True else: return False @staticmethod def building_exists(table_handler: BaseTable, building_id: str): table_handler.query( """ SELECT DISTINCT building_id FROM houses WHERE building_id = ? """, (building_id,) ) res = table_handler.cursor.fetchall() if res: return True else: return False @staticmethod def unit_exists(table_handler: BaseTable, unit_id: str): table_handler.query( """ SELECT DISTINCT unit_id FROM houses WHERE unit_id = ? """, (unit_id,) ) res = table_handler.cursor.fetchall() if res: return True else: return False