# -*- coding:utf-8 -*- """ @Author : xuxingchen @Contact : xuxingchen@sinochem.com @Desc : 记录车场信息,在初始化时会初始化csv进行覆盖 """ import csv import os import time from typing import Optional, List import re from pydantic import BaseModel, field_validator, Field from device.call import ServicesCall, UpdateFixedCardItem, VehicleRegistrationItem from models.devices import DevicesTable from utils.database import BaseTable, get_table_handler from utils.misc import (now_datetime_second, InvalidException, now_tz_datetime, millisecond_timestamp2tz, encrypt_number, decrypt_number) class AddParkingLot(BaseModel): parkinglot_number: str = Field(description="车场编号") parkinglot_name: str = Field(description="车场名称") factory_name: Optional[str] = Field(None, description="供应商") @field_validator("parkinglot_number") def check_parkinglot_number(cls, value): th = get_table_handler() if not ParkinglotsTable.exists(th, value, 'parkinglot'): raise InvalidException("停车场编号无效") if ParkinglotsTable.is_number_added(th, value, 'parkinglot'): raise InvalidException("停车场编号已添加") return value class AddCarsCard(BaseModel): car_type: str owner_name: str owner_phone: str start_datetime: str expire_datetime: str car_ids: List[str] associated_parking_space: str = "" @field_validator("car_type") def check_car_type(cls, value): types = ["业主车辆", "员工车辆", "访客车辆", "其他车辆"] if value not in types: raise InvalidException(f"请提供正确的车辆类型:{types}") return value @field_validator("owner_phone") def check_owner_phone(cls, value): pattern = re.compile(r'^(13[0-9]|14[01456879]|15[0-35-9]|16[2567]|17[0-8]|18[0-9]|19[0-35-9])\d{8}$') if pattern.search(value) is None: raise InvalidException("请提供正确的手机号码") th = get_table_handler() if not ParkinglotsTable.exists_phone(th, value): return value else: raise InvalidException(f"车辆所有人电话号码已存在") @field_validator("car_ids") def check_car_ids(cls, value): th = get_table_handler() for car_id in value: if ParkinglotsTable.get_car_id_auth_status(th, car_id)["auth_status"]: raise InvalidException(f"车牌号 {car_id} 已授权,请关联未授权的车牌号") return value class UpdateCarsCard(BaseModel): auth_id: int car_type: str owner_name: str owner_phone: str start_datetime: str expire_datetime: str car_ids: List[str] associated_parking_space: str @field_validator("auth_id") def check_auth_id(cls, value): th = get_table_handler() if not ParkinglotsTable.exists_auth_id(th, value): raise InvalidException(f"授权ID {value} 不存在") else: return value @field_validator("car_type") def check_car_type(cls, value): types = ["业主车辆", "员工车辆", "访客车辆", "其他车辆"] if value not in types: raise InvalidException(f"请提供正确的车辆类型:{types}") return value @field_validator("start_datetime") def check_datetime(cls, value): if "T" in value and "Z" in value: return value pattern = r'^\d{4}-\d{2}-\d{2} \d{2}:\d{2}$' if re.match(pattern, value): return value else: return InvalidException(f"请提供正确的时间格式:YYYY-MM-DD HH:MM") @field_validator("owner_phone") def check_owner_phone(cls, value): pattern = re.compile(r'^(13[0-9]|14[01456879]|15[0-35-9]|16[2567]|17[0-8]|18[0-9]|19[0-35-9])\d{8}$') if pattern.search(value) is None: raise InvalidException("请提供正确的手机号码") return value class GetCarsInfo(BaseModel): search_type: str search_key: str page: Optional[int] = None limit: Optional[int] = None @field_validator("search_type") def check_search_type(cls, value): types = { "车牌号码": "car_id", "车辆所有人": "owner", "联系电话": "phone" } if value not in types.keys(): raise InvalidException(f"请提供正确的搜索类型 {list(types.keys())}") else: return types[value] class ParkinglotsTable(BaseTable): @staticmethod def check(table_handler: BaseTable): """检测是否存在当前表""" # parkinglots 车场信息表 init_config_path = os.path.join(os.path.dirname(os.path.abspath("__file__")), "data/InitialData/parkinglots.csv") table_handler.query("SELECT name FROM sqlite_master WHERE type='table' AND name='parkinglots'") if table_handler.cursor.fetchone() is None: table_handler.execute( f""" CREATE TABLE parkinglots ( id TEXT, number TEXT, name TEXT, type TEXT, parkinglot_id TEXT, area_id TEXT, channel_id TEXT, factory_name TEXT, channel_type TEXT, is_online TEXT, gate_status TEXT, running_status TEXT, update_datetime TEXT, PRIMARY KEY (id, type) ) """ ) if os.path.exists(init_config_path): with open(init_config_path, newline='', encoding='utf8') as csvfile: csvreader = csv.reader(csvfile) head = next(csvreader) data = [] if len(head) == 12: for row in csvreader: # id,number,name,type,area_id,channel_id,factory_name,channel_type,is_online,gate_status,running_status _id = row[0].strip() number = row[1].strip() if row[1].strip() else None name = row[2].strip() _type = row[3].strip() parkinglot_id = row[4].strip() area_id = row[5].strip() if row[5].strip() else None channel_id = row[6].strip() if row[6].strip() else None factory_name = row[7].strip() if row[7].strip() else None channel_type = row[8].strip() if row[8].strip() else None is_online = row[9].strip() if row[9].strip() else None gate_status = row[10].strip() if row[10].strip() else None running_status = row[11].strip() if row[11].strip() else None data.append((_id, number, name, _type, parkinglot_id, area_id, channel_id, factory_name, channel_type, is_online, gate_status, running_status, now_datetime_second())) table_handler.executemany( f""" INSERT OR IGNORE INTO parkinglots (id, number, name, type, parkinglot_id, area_id, channel_id, factory_name, channel_type, is_online, gate_status, running_status, update_datetime) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, data ) # cars_card 车辆月卡信息表 table_handler.query("SELECT name FROM sqlite_master WHERE type='table' AND name='cars_card'") if table_handler.cursor.fetchone() is None: table_handler.execute( f""" CREATE TABLE cars_card ( auth_id INTEGER PRIMARY KEY AUTOINCREMENT, card_id TEXT UNIQUE, car_type TEXT, owner TEXT, phone TEXT UNIQUE, start_datetime TEXT, expire_datetime TEXT, associated_parking_space TEXT, update_datetime TEXT ) """ ) init_config_path = os.path.join(os.path.dirname(os.path.abspath("__file__")), "data/InitialData/cars_card.csv") if os.path.exists(init_config_path): with open(init_config_path, newline='', encoding='utf8') as csvfile: csvreader = csv.reader(csvfile) head = next(csvreader) data = [] if len(head) == 7: for row in csvreader: # auth_id,car_type,owner,phone,start_datetime,expire_datetime auth_id = row[0].strip() card_id = row[1].strip() car_type = row[2].strip() owner = row[3].strip() phone = encrypt_number(row[4].strip()) start_datetime = row[5].strip() expire_datetime = row[6].strip() data.append((auth_id, card_id, car_type, owner, phone, start_datetime, expire_datetime, now_datetime_second())) table_handler.executemany( f""" INSERT OR IGNORE INTO cars_card (auth_id, card_id, car_type, owner, phone, start_datetime, expire_datetime, update_datetime) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, data ) # cars_auth 授权关联信息表 table_handler.query("SELECT name FROM sqlite_master WHERE type='table' AND name='cars_auth'") if table_handler.cursor.fetchone() is None: table_handler.execute( f""" CREATE TABLE cars_auth ( auth_id INTEGER, car_id TEXT, car_type TEXT, registration_id TEXT, update_datetime TEXT, PRIMARY KEY (auth_id, car_id) ) """ ) init_config_path = os.path.join(os.path.dirname(os.path.abspath("__file__")), "data/InitialData/cars_auth.csv") if os.path.exists(init_config_path): with (open(init_config_path, newline='', encoding='utf8') as csvfile): csvreader = csv.reader(csvfile) head = next(csvreader) data = [] if len(head) == 3: for row in csvreader: # auth_id,car_id,car_type auth_id = row[0].strip() car_id = row[1].strip() car_type = row[2].strip() registration_id = str(int(time.time() * (10 ** 7))) data.append((auth_id, car_id, car_type, registration_id, now_datetime_second())) table_handler.executemany( f""" INSERT OR IGNORE INTO cars_auth (auth_id, car_id, car_type, registration_id, update_datetime) VALUES (?, ?, ?, ?, ?) """, data ) # 移除 cars_card & cars_auth 表中过期的授权记录 now_datetime = now_datetime_second()[:-3] table_handler.query( """ SELECT auth_id FROM cars_card WHERE expire_datetime < ? """, (now_datetime,) ) res = table_handler.cursor.fetchall() if res: for i in res: table_handler.execute("DELETE FROM cars_card WHERE auth_id = ?", (i[0],)) table_handler.execute("DELETE FROM cars_auth WHERE auth_id = ?", (i[0],)) @staticmethod def get_parkinglots_id(table_handler: BaseTable, number: str, _type: str): """根据编码和_type查询停车场信息中的空间ID""" table_handler.query("SELECT id from parkinglots WHERE number = ? AND type = ?", (number, _type)) res = table_handler.cursor.fetchall() if res: return res[0][0] else: raise InvalidException(f"{_type} 中不存在编码 {number} ") @staticmethod def get_auth_id_by_card_id(table_handler: BaseTable, card_id: str): """根据月卡ID查询车辆授权ID""" table_handler.query("SELECT auth_id from cars_card WHERE card_id = ?", (card_id,)) res = table_handler.cursor.fetchall() if res: return res[0][0] else: raise InvalidException(f"月卡ID {card_id} 不存在 ") @staticmethod def get_parkinglots_info(table_handler: BaseTable): table_handler.query( """ SELECT id, name, factory_name FROM parkinglots WHERE type = 'parkinglot' AND running_status is not null """ ) res = table_handler.cursor.fetchall() if res: info = [] for i in res: info.append({ "number": i[0], "name": i[1], "factory_name": i[2] if i[2] else "" }) return {"info": info} else: return {"info": []} @staticmethod def get_showed_parkinglot_area_info(table_handler: BaseTable): table_handler.query( """ SELECT area_number, name, parkinglot_name, '逻辑区域' as area_type, pc.channel_count, pc.channel_name FROM (SELECT id area_id, number area_number, name, parkinglot_name FROM parkinglots pa LEFT JOIN (SELECT id parkinglot_number, name parkinglot_name FROM parkinglots WHERE type = 'parkinglot' AND running_status IS NOT NULL) ON parkinglot_number = pa.parkinglot_id WHERE type = 'area' AND parkinglot_number IS NOT NULL) pa LEFT JOIN (SELECT area_id, count(id) channel_count, GROUP_CONCAT(name) channel_name FROM parkinglots WHERE type = 'channel' GROUP BY area_id) pc ON pc.area_id = pa.area_id """ ) res = table_handler.cursor.fetchall() if res: info = [] for i in res: info.append({ "area_number": i[0], "area_name": i[1], "parkinglot_name": i[2], "area_type": i[3], "channel_count": i[4], "channel_name": i[5] }) return {"info": info} else: return {"info": []} @staticmethod def get_showed_parkinglot_channel_info(table_handler: BaseTable): table_handler.query( """ SELECT number, name, channel_type, parkinglot_name FROM parkinglots pa LEFT JOIN (SELECT id parkinglot_number, name parkinglot_name FROM parkinglots WHERE type = 'parkinglot' AND running_status IS NOT NULL) ON parkinglot_number = pa.parkinglot_id WHERE type = 'channel' AND parkinglot_number IS NOT NULL """ ) res = table_handler.cursor.fetchall() if res: info = [] for i in res: info.append({ "channel_number": i[0], "channel_name": i[1], "channel_type": i[2], "parkinglot_name": i[3] }) return {"info": info} else: return {"info": []} @staticmethod def get_showed_parkinglot_gate_info(table_handler: BaseTable): table_handler.query( """ SELECT number, name, is_online, gate_status, running_status, parkinglot_name, channel_name FROM parkinglots pa LEFT JOIN (SELECT id parkinglot_number, name parkinglot_name FROM parkinglots WHERE type = 'parkinglot' AND running_status IS NOT NULL) ON parkinglot_number = pa.parkinglot_id LEFT JOIN (SELECT id channel_id, name channel_name FROM parkinglots WHERE type = 'channel') pc ON pc.channel_id = pa.channel_id WHERE type = 'gate' AND parkinglot_number IS NOT NULL """ ) res = table_handler.cursor.fetchall() info = [] if res: for i in res: info.append({ "gate_number": i[0], "gate_name": i[1], "is_online": i[2], "gate_status": i[3], "running_status": i[4], "parkinglot_name": i[5], "channel_name": i[6] if i[6] else "" }) return {"info": info} @staticmethod def get_cars_info(table_handler: BaseTable, search_type: Optional[str], search_key: Optional[str], page: Optional[int] = None, limit: Optional[int] = None): sub_sql_list = [] if search_type is not None: if search_type == "owner": sub_sql_list.append(f"owner like '%{search_key}%'") elif search_type == "car_id": sub_sql_list.append(f"car_ids like '%{search_key}%'") else: sub_sql_list.append(f"{search_type} = '{search_key}'") if len(sub_sql_list) > 0: sub_sql = "WHERE " + " AND ".join(sub_sql_list) else: sub_sql = "" if page is not None and limit is not None: page_sql = f"LIMIT {limit} OFFSET {(page - 1) * limit}" else: page_sql = "" query_sql = f""" SELECT cars_card.auth_id, car_ids, car_type, owner, phone, start_datetime, expire_datetime FROM cars_card LEFT JOIN (SELECT auth_id, GROUP_CONCAT(car_id) car_ids FROM cars_auth GROUP BY auth_id) ca ON ca.auth_id = cars_card.auth_id {sub_sql} """ table_handler.query(f"SELECT COUNT(*) FROM ({query_sql})") total = table_handler.cursor.fetchall()[0][0] table_handler.query(f"{query_sql} {page_sql}") res = table_handler.cursor.fetchall() info = [] if res: for i in res: info.append({ "auth_id": i[0], "car_ids": i[1], "car_type": i[2], "owner_name": i[3], "owner_phone": decrypt_number(i[4]), "start_datetime": i[5], "expire_datetime": i[6] }) return {"info": info, "total": total} @staticmethod def get_auth_info(table_handler: BaseTable, auth_id: int): table_handler.query( """ SELECT cars_card.auth_id, car_type, owner, phone, start_datetime, expire_datetime, car_ids, associated_parking_space FROM cars_card LEFT JOIN (SELECT auth_id, GROUP_CONCAT(car_id) car_ids FROM cars_auth GROUP BY auth_id) ca ON ca.auth_id = cars_card.auth_id WHERE cars_card.auth_id = ? """, (auth_id,) ) res = table_handler.cursor.fetchall() if res: car_ids = [i for i in res[0][6].split(',')] return { "auth_id": res[0][0], "car_type": res[0][1], "owner_name": res[0][2], "owner_phone": decrypt_number(res[0][3]), "start_datetime": res[0][4], "expire_datetime": res[0][5], "car_ids": car_ids, "car_count": len(car_ids), "associated_parking_space": res[0][7] if res[0][7] else '' } else: raise InvalidException("授权ID不存在") @staticmethod def get_car_id_auth_status(table_handler: BaseTable, car_id: str): now_datetime = now_datetime_second()[:-3] table_handler.query( """ SELECT cars_auth.auth_id, car_id, start_datetime, expire_datetime FROM cars_auth LEFT JOIN cars_card ON cars_card.auth_id = cars_auth.auth_id WHERE car_id = ? AND expire_datetime > ? """, (car_id, now_datetime) ) res = table_handler.cursor.fetchall() if res: return {"auth_status": True} else: return {"auth_status": False} @staticmethod def get_auth_info_by_auth_id(table_handler: BaseTable, auth_id): table_handler.query( """ SELECT (SELECT card_id FROM cars_card WHERE auth_id = ?) AS card_id, (SELECT owner FROM cars_card WHERE auth_id = ?) AS owner_name, (SELECT phone FROM cars_card WHERE auth_id = ?) AS owner_phone, (SELECT car_type FROM cars_card WHERE auth_id = ?) AS car_type, (SELECT GROUP_CONCAT(device_id) FROM devices_auth WHERE _id = ? AND record_type = '车行') AS device_ids, (SELECT GROUP_CONCAT(car_id) FROM cars_auth WHERE auth_id = ?) AS car_ids, (SELECT GROUP_CONCAT(cars_auth.registration_id) FROM cars_auth WHERE auth_id = ?) AS reg_ids """, (auth_id, auth_id, auth_id, auth_id, auth_id, auth_id, auth_id) ) res = table_handler.cursor.fetchall() card_id, owner_name, owner_phone, car_type = None, None, None, None device_ids = [] car_ids = [] reg_ids = [] if res: if res[0][0] is not None: card_id = res[0][0] if res[0][1] is not None: owner_name = res[0][1] if res[0][2] is not None: owner_phone = decrypt_number(res[0][2]) if res[0][3] is not None: car_type = res[0][3] if res[0][4] is not None: device_ids = [i for i in res[0][4].split(',')] if res[0][5] is not None: car_ids = [i for i in res[0][5].split(',')] if res[0][6] is not None: reg_ids = [i for i in res[0][6].split(',')] return card_id, owner_name, owner_phone, car_type, device_ids, car_ids, reg_ids @staticmethod def get_card_id_by_auth_id(table_handler: BaseTable, auth_id: int): table_handler.query("SELECT card_id FROM cars_card WHERE auth_id = ?", (auth_id,)) res = table_handler.cursor.fetchall() if res: return res[0][0] else: raise InvalidException(f"授权ID {auth_id} 不存在") @staticmethod def get_car_card_interval(table_handler: BaseTable, card_id: str): """获取对应月卡的有效期间""" table_handler.query("SELECT start_datetime, expire_datetime FROM cars_card WHERE card_id = ?", (card_id,)) res = table_handler.cursor.fetchall() if res: return res[0][0], res[0][1] else: raise InvalidException(f"月卡ID {card_id} 未查询到对应有效期记录") @staticmethod def get_auth_id_interval(table_handler: BaseTable, auth_id: int): """获取对应月卡的有效期间""" table_handler.query("SELECT start_datetime, expire_datetime FROM cars_card WHERE auth_id = ?", (auth_id,)) res = table_handler.cursor.fetchall() if res: return res[0][0], res[0][1] else: raise InvalidException(f"授权ID {auth_id} 未查询到对应有效期记录") @staticmethod def get_car_reg_id(table_handler: BaseTable, car_id: str): table_handler.query("SELECT registration_id FROM cars_auth WHERE car_id = ?", (car_id,)) res = table_handler.cursor.fetchall() if res: return res[0][0] else: raise InvalidException(f"Car ID {car_id} 未查询到对应注册ID") @staticmethod def add_update_cars_auth_record(table_handler: BaseTable, auth_id: int, car_ids: list, reg_ids: list): cars_auth_data = [] for i, car_id in enumerate(car_ids): car_type = "新能源" if len(car_id) == 7 else "普通" nt = now_datetime_second() cars_auth_data.append((auth_id, car_id, car_type, reg_ids[i], nt, reg_ids[i], nt)) table_handler.executemany( """ INSERT INTO cars_auth (auth_id, car_id, car_type, registration_id, update_datetime) VALUES (?, ?, ?, ?, ?) ON CONFLICT (auth_id, car_id) DO UPDATE SET registration_id = ?, update_datetime = ? """, cars_auth_data ) @staticmethod def add_car_card(table_handler: BaseTable, card_id: str, device_ids: list, reg_ids: list, obj: AddCarsCard | UpdateCarsCard): """增加车辆授权的月卡信息,插入到 cars_card & cars_auth & devices_auth""" table_handler.execute( """ INSERT INTO cars_card (card_id, car_type, owner, phone, associated_parking_space, start_datetime, expire_datetime, update_datetime) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, (card_id, obj.car_type, obj.owner_name, encrypt_number(obj.owner_phone), obj.associated_parking_space, obj.start_datetime, obj.expire_datetime, now_datetime_second()) ) auth_id = ParkinglotsTable.get_auth_id_by_card_id(table_handler, card_id) # cars_auth ParkinglotsTable.add_update_cars_auth_record(table_handler, auth_id, obj.car_ids, reg_ids) # devices_auth st = obj.start_datetime et = obj.expire_datetime for device_id in device_ids: DevicesTable.add_update_device_auth(table_handler, device_id, [auth_id], "车行", st, et) @staticmethod def add_car_card_auth(table_handler: BaseTable, obj: AddCarsCard): """注册车辆 & 新建月卡 & 保留记录""" # 1. 获取车场系统 device_id device_ids = DevicesTable.get_device_ids(table_handler, filter_name="停车") if len(device_ids) > 0: sc = ServicesCall() vri = VehicleRegistrationItem( car_plate_no="", registration_id="", owner_id=obj.owner_phone, owner_name=obj.owner_name, registration_time=now_tz_datetime(), begin_time=obj.start_datetime, action="insert", registration_type='0' if "业主" in obj.car_type else '1' ) base_reg_id = int(time.time() * (10 ** 7)) reg_ids = [str(base_reg_id + 1) for _ in obj.car_ids] # 不同设备、相同车牌 相同注册ID success_device_ids_0 = [] for device_index, device_id in enumerate(device_ids): # 2. 注册车辆,只要存在失败的设备,就对所有之前成功的设备进行回滚 for car_id_index, car_id in enumerate(obj.car_ids): vri.car_plate_no = car_id vri.registration_id = reg_ids[car_id_index] _callback, code, msg = sc.vehicle_registration(device_id, vri) if not _callback and "UNIQUE" not in msg: # 执行回滚 if len(success_device_ids_0) > 1: for success_device_id in success_device_ids_0: for j, _id in enumerate(obj.car_ids): vri.car_plate_no = _id vri.registration_id = reg_ids[j] vri.action = "delete" sc.vehicle_registration(success_device_id, vri) raise InvalidException(f"车辆注册失败,设备:{device_id},错误码:{code},{msg},已执行回滚") else: if "UNIQUE" in msg: vri.action = "update" _callback, code, msg = sc.vehicle_registration(device_id, vri) success_device_ids_0.append(device_id) # 3. 新建月卡 multi_car_id = '|'.join(obj.car_ids) t = str(int(time.time() * 1000)) card_id = t[:8] + '-' + t[8:12] + '-' + t[12:] + "000-0000-000000000000" # 不同设备、相同车牌 相同月卡ID ufci = UpdateFixedCardItem( vehicle_owner_id=obj.owner_phone, vehicle_owner_name=obj.owner_name, vehicle_owner_phone=obj.owner_phone, plate_number=multi_car_id, effective_date_begin=obj.start_datetime, effective_date_end=obj.expire_datetime, action="insert", card_id=card_id ) success_device_ids_1 = [] for device_index, device_id in enumerate(device_ids): _callback, code, msg = sc.update_fixed_card(device_id, ufci) if not _callback: # 月卡新建回滚 if len(success_device_ids_1) > 1: for success_device_id in success_device_ids_1: ufci.action = "delete" sc.update_fixed_card(success_device_id, ufci) # 车辆注册回滚 if len(success_device_ids_0) > 1: for success_device_id in success_device_ids_0: for j, _id in enumerate(obj.car_ids): vri.car_plate_no = _id vri.registration_id = reg_ids[j] vri.action = "delete" sc.vehicle_registration(success_device_id, vri) raise InvalidException(f"车辆新建月卡失败,设备:{device_id},错误码:{code},已执行回滚") else: success_device_ids_1.append(device_id) # 4. 记录授权 cars_card & cars_auth & devices_auth ParkinglotsTable.add_car_card(table_handler, card_id, device_ids, reg_ids, obj) return {"status": True} @staticmethod def delete_car_auth_id(table_handler: BaseTable, auth_id: int): """移除授权ID下的基本信息及关联车辆授权""" # 1. 确认查询授权ID有效 if ParkinglotsTable.exists_auth_id(table_handler, auth_id): # 2. 获取相关授权设备ID & 车辆列表,在设备上进行 月卡删除 & 车辆删除(→_→暂不处理) card_id, owner_name, owner_phone, car_type, device_ids, car_ids, _ = \ ParkinglotsTable.get_auth_info_by_auth_id(table_handler, auth_id) start_date, expire_date = ParkinglotsTable.get_car_card_interval(table_handler, card_id) if len(device_ids) > 0: success_ids = [] fail_ids = [] fail_code = [] fail_msg = [] sc = ServicesCall() update_card_item = UpdateFixedCardItem( vehicle_owner_id=owner_phone, vehicle_owner_name=owner_name, vehicle_owner_phone=owner_phone, plate_number="", effective_date_begin=start_date, effective_date_end=expire_date, action="delete", card_id=card_id ) for device_id in device_ids: update_card_item.plate_number = '|'.join(car_ids) # 3. 设备移除授权 _callback, code, msg = sc.update_fixed_card(device_id, update_card_item) if not _callback: fail_ids.append(device_id) fail_code.append(code) fail_msg.append(msg) else: success_ids.append(device_id) # 4. 删除授权ID cars_card & cars_auth & devices_auth 记录 if len(success_ids) == len(device_ids): # 全部设备执行成功 sqls = [ "DELETE FROM cars_card WHERE auth_id = ?", "DELETE FROM cars_auth WHERE auth_id = ?", "DELETE FROM devices_auth WHERE _id = ? AND record_type = '车行'", ] params = [(auth_id,), (auth_id,), (auth_id,)] table_handler.execute(sqls, params) return {"status": True} elif len(success_ids) > 0: sqls = [ "DELETE FROM cars_card WHERE auth_id = ?", "DELETE FROM cars_auth WHERE auth_id = ?" ] _sql = "DELETE FROM devices_auth WHERE _id = ? AND device_id = ? AND record_type = '车行'" sqls += [_sql for _ in range(len(success_ids))] params = [(auth_id,), (auth_id,)] params += [(auth_id, success_id) for success_id in success_ids] table_handler.execute(sqls, params) raise InvalidException(f"部分授权移除失败:设备ID {fail_ids}, 错误码: {fail_code}, {fail_msg}") else: raise InvalidException(f"授权移除失败:设备ID {fail_ids}, 错误码: {fail_code}, {fail_msg}") else: raise InvalidException(f"授权ID {auth_id} 无效") @staticmethod def delete_car_auth(table_handler: BaseTable, delete_car_list): """删除车辆授权的月卡信息,cars_auth""" params = [] for car_id in delete_car_list: params.append((car_id,)) table_handler.executemany("DELETE FROM cars_auth WHERE car_id = ?", params) @staticmethod def update_car_card(table_handler: BaseTable, auth_id: int, car_type: str, owner_name: str, owner_phone: str, associated_parking_space: str, start_datetime: str, expire_datetime: str): """更新车辆授权的月卡信息,cars_card & cars_auth & devices_auth""" table_handler.execute( """ UPDATE cars_card SET car_type = ?, owner = ?, phone = ?, associated_parking_space = ?, start_datetime = ?, expire_datetime = ?, update_datetime = ? WHERE auth_id = ? """, (car_type, owner_name, encrypt_number(owner_phone), associated_parking_space, start_datetime, expire_datetime, now_datetime_second(), auth_id) ) @staticmethod def update_car_auth(table_handler: BaseTable, device_ids: list, reg_ids: list, obj: UpdateCarsCard): """更新车辆授权的车辆信息和设备授权日期,cars_auth & devices_auth""" # cars_auth ParkinglotsTable.add_update_cars_auth_record(table_handler, obj.auth_id, obj.car_ids, reg_ids) # devices_auth st = obj.start_datetime et = obj.expire_datetime for device_id in device_ids: DevicesTable.add_update_device_auth(table_handler, device_id, [str(obj.auth_id)], "车行", st, et) @staticmethod def update_parkinglot_show(table_handler: BaseTable, _id: str, name: Optional[str], factory_name: Optional[str], is_show: bool): """更新停车场显示状态""" if not ParkinglotsTable.exists_number(table_handler, _id, 'parkinglot'): raise InvalidException(f"不存在编码 {_id} ") status = 'show' if is_show else None sub_sql_list = [] if name is not None: sub_sql_list.append(f"name = '{name}'") if factory_name is not None: sub_sql_list.append(f"factory_name = '{factory_name}'") if len(sub_sql_list) > 0: sub_sql = ', '.join(sub_sql_list) + ', ' else: sub_sql = '' table_handler.execute( f""" UPDATE parkinglots SET {sub_sql}running_status = ?, update_datetime = ? WHERE type = 'parkinglot' AND id = ? """, (status, now_datetime_second(), _id) ) return {"status": True} @staticmethod def update_car_card_auth(table_handler: BaseTable, obj: UpdateCarsCard): """更新车辆 & 更新月卡 & 保留记录""" # 1. 获取车场系统 device_id device_ids = DevicesTable.get_device_ids(table_handler, filter_name="停车") if len(device_ids) > 0: old_st, old_et = ParkinglotsTable.get_auth_id_interval(table_handler, obj.auth_id) registration_time = millisecond_timestamp2tz( ParkinglotsTable.get_car_reg_id(table_handler, str(obj.car_ids[0]))[:13]) sc = ServicesCall() vri = VehicleRegistrationItem( car_plate_no="", registration_id="", owner_id=obj.owner_phone, owner_name=obj.owner_name, registration_time=registration_time, begin_time=obj.start_datetime, action="", registration_type='0' if "业主" in obj.car_type else '1' ) # 2. 查询当前授权情况,获取 已授权车辆列表、待授权车辆列表、待移除车辆列表、待更新车辆列表 card_id, owner_name, owner_phone, car_type, device_ids, authed_car_list, authed_reg_ids = \ ParkinglotsTable.get_auth_info_by_auth_id(table_handler, obj.auth_id) insert_car_list = obj.car_ids.copy() delete_car_list = [] update_car_list = [] if (obj.start_datetime < now_datetime_second()[:-3] or obj.start_datetime == old_st) and \ obj.expire_datetime == old_et and obj.owner_phone == owner_phone and \ obj.owner_name == owner_name: # 若有效时间+人名+手机号码未变更则不变的车辆不需更新,只考虑新增的车辆、移除的车辆 for car_id in obj.car_ids: if car_id in authed_car_list: insert_car_list.remove(car_id) for car_id in authed_car_list: if car_id not in obj.car_ids: delete_car_list.append(car_id) else: for car_id in obj.car_ids: if car_id in authed_car_list: insert_car_list.remove(car_id) update_car_list.append(car_id) for car_id in authed_car_list: if car_id not in obj.car_ids: delete_car_list.append(car_id) base_reg_id = int(time.time() * (10 ** 7)) reg_ids = [str(base_reg_id + 1) for _ in insert_car_list] # 不同设备、相同车牌 相同注册ID success_insert_vri = [] success_delete_vri = [] success_update_origin_vri = [] fail_callback = False for device_index, device_id in enumerate(device_ids): # 3. 注册车辆 & 删除车辆 & 更新车辆 for car_id_index, car_id in enumerate(insert_car_list): vri.car_plate_no = car_id vri.registration_id = reg_ids[car_id_index] vri.action = "insert" _callback, code, msg = sc.vehicle_registration(device_id, vri) if _callback or "UNIQUE" not in msg: if not _callback and "UNIQUE" not in msg: update_car_list.append(car_id) else: if "UNIQUE" in msg: vri.action = "update" _callback, _, _ = sc.vehicle_registration(device_id, vri) success_insert_vri.append([device_id, vri.copy()]) else: fail_callback = True break for car_id in delete_car_list: vri.car_plate_no = car_id vri.registration_id = ParkinglotsTable.get_car_reg_id(table_handler, car_id) vri.action = "delete" _callback, code, msg = sc.vehicle_registration(device_id, vri) if _callback: success_delete_vri.append([device_id, vri.copy()]) else: fail_callback = True break for car_id in update_car_list: vri.car_plate_no = car_id vri.registration_id = ParkinglotsTable.get_car_reg_id(table_handler, car_id) vri.action = "update" _callback, code, msg = sc.vehicle_registration(device_id, vri) if _callback: vri.owner_id = owner_phone vri.owner_name = owner_name vri.begin_time = old_st vri.registration_time = registration_time vri.registration_type = car_type success_update_origin_vri.append([device_id, vri.copy()]) else: fail_callback = True break # TODO 回滚逻辑待校验 if fail_callback: # 只要存在失败的设备,就对所有之前成功的设备进行回滚 for insert_item in success_insert_vri: insert_item[1].action = "delete" _callback, _, _ = sc.vehicle_registration(insert_item[0], insert_item[1]) for delete_item in success_delete_vri: delete_item[1].action = "insert" _callback, _, _ = sc.vehicle_registration(delete_item[0], delete_item[1]) for update_item in success_update_origin_vri: _callback, _, _ = sc.vehicle_registration(update_item[0], update_item[1]) # 3. 更新月卡 multi_car_id = '|'.join(obj.car_ids) card_id = ParkinglotsTable.get_card_id_by_auth_id(table_handler, obj.auth_id) ufci = UpdateFixedCardItem( vehicle_owner_id=obj.owner_phone, vehicle_owner_name=obj.owner_name, vehicle_owner_phone=obj.owner_phone, plate_number=multi_car_id, effective_date_begin=obj.start_datetime, effective_date_end=obj.expire_datetime, action="update", card_id=card_id ) success_device_ids_1 = [] for device_id in device_ids: _callback, code, msg = sc.update_fixed_card(device_id, ufci) if not _callback: raise InvalidException(f"车辆更新月卡失败,设备:{device_id},错误码:{code},{msg}") else: success_device_ids_1.append(device_id) # 4. 记录授权 cars_card & cars_auth & devices_auth ParkinglotsTable.update_car_card(table_handler, obj.auth_id, obj.car_type, obj.owner_name, obj.owner_phone, obj.associated_parking_space, obj.start_datetime, obj.expire_datetime) if insert_car_list: obj.car_ids = insert_car_list ParkinglotsTable.update_car_auth(table_handler, device_ids, reg_ids, obj) if delete_car_list: ParkinglotsTable.delete_car_auth(table_handler, delete_car_list) return {"status": True} @staticmethod def is_number_added(table_handler: BaseTable, number: str, _type: str): """判断停车场编号是否已添加显示""" number_col = "number" if _type != "parkinglot" else "id" table_handler.query( f"SELECT id FROM parkinglots WHERE {number_col} = ? and type = ? and running_status is not null", (number, _type)) res = table_handler.cursor.fetchall() if res: return True else: return False @staticmethod def exists(table_handler: BaseTable, _id: str, _type: str): table_handler.query(f"SELECT id FROM parkinglots WHERE id = ? and type = ?", (_id, _type)) res = table_handler.cursor.fetchall() if res: return True else: return False @staticmethod def exists_number(table_handler: BaseTable, number: str, _type: str): number_col = "number" if _type != "parkinglot" else "id" table_handler.query(f"SELECT id FROM parkinglots WHERE {number_col} = ? and type = ?", (number, _type)) res = table_handler.cursor.fetchall() if res: return True else: return False @staticmethod def exists_phone(table_handler: BaseTable, phone: str): table_handler.query(f"SELECT auth_id FROM cars_card WHERE phone = ?", (encrypt_number(phone),)) res = table_handler.cursor.fetchall() if res: return True else: return False @staticmethod def exists_auth_id(table_handler: BaseTable, auth_id: int): table_handler.query(f"SELECT auth_id FROM cars_card WHERE auth_id = ?", (auth_id,)) res = table_handler.cursor.fetchall() if res: return True else: return False