history-project/SCLP/models/parkinglots.py
2024-10-24 09:23:39 +08:00

1020 lines
46 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding:utf-8 -*-
"""
@Author : xuxingchen
@Contact : xuxingchen@sinochem.com
@Desc : 记录车场信息在初始化时会初始化csv进行覆盖
"""
import csv
import os
import time
from typing import Optional, List
import re
from pydantic import BaseModel, field_validator, Field
from device.call import ServicesCall, UpdateFixedCardItem, VehicleRegistrationItem
from models.devices import DevicesTable
from utils.database import BaseTable, get_table_handler
from utils.misc import (now_datetime_second, InvalidException, now_tz_datetime, millisecond_timestamp2tz,
encrypt_number, decrypt_number)
class AddParkingLot(BaseModel):
parkinglot_number: str = Field(description="车场编号")
parkinglot_name: str = Field(description="车场名称")
factory_name: Optional[str] = Field(None, description="供应商")
@field_validator("parkinglot_number")
def check_parkinglot_number(cls, value):
th = get_table_handler()
if not ParkinglotsTable.exists(th, value, 'parkinglot'):
raise InvalidException("停车场编号无效")
if ParkinglotsTable.is_number_added(th, value, 'parkinglot'):
raise InvalidException("停车场编号已添加")
return value
class AddCarsCard(BaseModel):
car_type: str
owner_name: str
owner_phone: str
start_datetime: str
expire_datetime: str
car_ids: List[str]
associated_parking_space: str = ""
@field_validator("car_type")
def check_car_type(cls, value):
types = ["业主车辆", "员工车辆", "访客车辆", "其他车辆"]
if value not in types:
raise InvalidException(f"请提供正确的车辆类型:{types}")
return value
@field_validator("owner_phone")
def check_owner_phone(cls, value):
pattern = re.compile(r'^(13[0-9]|14[01456879]|15[0-35-9]|16[2567]|17[0-8]|18[0-9]|19[0-35-9])\d{8}$')
if pattern.search(value) is None:
raise InvalidException("请提供正确的手机号码")
th = get_table_handler()
if not ParkinglotsTable.exists_phone(th, value):
return value
else:
raise InvalidException(f"车辆所有人电话号码已存在")
@field_validator("car_ids")
def check_car_ids(cls, value):
th = get_table_handler()
for car_id in value:
if ParkinglotsTable.get_car_id_auth_status(th, car_id)["auth_status"]:
raise InvalidException(f"车牌号 {car_id} 已授权,请关联未授权的车牌号")
return value
class UpdateCarsCard(BaseModel):
auth_id: int
car_type: str
owner_name: str
owner_phone: str
start_datetime: str
expire_datetime: str
car_ids: List[str]
associated_parking_space: str
@field_validator("auth_id")
def check_auth_id(cls, value):
th = get_table_handler()
if not ParkinglotsTable.exists_auth_id(th, value):
raise InvalidException(f"授权ID {value} 不存在")
else:
return value
@field_validator("car_type")
def check_car_type(cls, value):
types = ["业主车辆", "员工车辆", "访客车辆", "其他车辆"]
if value not in types:
raise InvalidException(f"请提供正确的车辆类型:{types}")
return value
@field_validator("start_datetime")
def check_datetime(cls, value):
if "T" in value and "Z" in value:
return value
pattern = r'^\d{4}-\d{2}-\d{2} \d{2}:\d{2}$'
if re.match(pattern, value):
return value
else:
return InvalidException(f"请提供正确的时间格式YYYY-MM-DD HH:MM")
@field_validator("owner_phone")
def check_owner_phone(cls, value):
pattern = re.compile(r'^(13[0-9]|14[01456879]|15[0-35-9]|16[2567]|17[0-8]|18[0-9]|19[0-35-9])\d{8}$')
if pattern.search(value) is None:
raise InvalidException("请提供正确的手机号码")
return value
class GetCarsInfo(BaseModel):
search_type: str
search_key: str
page: Optional[int] = None
limit: Optional[int] = None
@field_validator("search_type")
def check_search_type(cls, value):
types = {
"车牌号码": "car_id",
"车辆所有人": "owner",
"联系电话": "phone"
}
if value not in types.keys():
raise InvalidException(f"请提供正确的搜索类型 {list(types.keys())}")
else:
return types[value]
class ParkinglotsTable(BaseTable):
@staticmethod
def check(table_handler: BaseTable):
"""检测是否存在当前表"""
# parkinglots 车场信息表
init_config_path = os.path.join(os.path.dirname(os.path.abspath("__file__")),
"data/InitialData/parkinglots.csv")
table_handler.query("SELECT name FROM sqlite_master WHERE type='table' AND name='parkinglots'")
if table_handler.cursor.fetchone() is None:
table_handler.execute(
f"""
CREATE TABLE parkinglots (
id TEXT,
number TEXT,
name TEXT,
type TEXT,
parkinglot_id TEXT,
area_id TEXT,
channel_id TEXT,
factory_name TEXT,
channel_type TEXT,
is_online TEXT,
gate_status TEXT,
running_status TEXT,
update_datetime TEXT,
PRIMARY KEY (id, type)
)
"""
)
if os.path.exists(init_config_path):
with open(init_config_path, newline='', encoding='utf8') as csvfile:
csvreader = csv.reader(csvfile)
head = next(csvreader)
data = []
if len(head) == 12:
for row in csvreader:
# id,number,name,type,area_id,channel_id,factory_name,channel_type,is_online,gate_status,running_status
_id = row[0].strip()
number = row[1].strip() if row[1].strip() else None
name = row[2].strip()
_type = row[3].strip()
parkinglot_id = row[4].strip()
area_id = row[5].strip() if row[5].strip() else None
channel_id = row[6].strip() if row[6].strip() else None
factory_name = row[7].strip() if row[7].strip() else None
channel_type = row[8].strip() if row[8].strip() else None
is_online = row[9].strip() if row[9].strip() else None
gate_status = row[10].strip() if row[10].strip() else None
running_status = row[11].strip() if row[11].strip() else None
data.append((_id, number, name, _type, parkinglot_id, area_id, channel_id, factory_name,
channel_type, is_online, gate_status, running_status, now_datetime_second()))
table_handler.executemany(
f"""
INSERT OR IGNORE INTO parkinglots
(id, number, name, type, parkinglot_id, area_id, channel_id, factory_name, channel_type,
is_online, gate_status, running_status, update_datetime)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", data
)
# cars_card 车辆月卡信息表
table_handler.query("SELECT name FROM sqlite_master WHERE type='table' AND name='cars_card'")
if table_handler.cursor.fetchone() is None:
table_handler.execute(
f"""
CREATE TABLE cars_card (
auth_id INTEGER PRIMARY KEY AUTOINCREMENT,
card_id TEXT UNIQUE,
car_type TEXT,
owner TEXT,
phone TEXT UNIQUE,
start_datetime TEXT,
expire_datetime TEXT,
associated_parking_space TEXT,
update_datetime TEXT
)
"""
)
init_config_path = os.path.join(os.path.dirname(os.path.abspath("__file__")), "data/InitialData/cars_card.csv")
if os.path.exists(init_config_path):
with open(init_config_path, newline='', encoding='utf8') as csvfile:
csvreader = csv.reader(csvfile)
head = next(csvreader)
data = []
if len(head) == 7:
for row in csvreader:
# auth_id,car_type,owner,phone,start_datetime,expire_datetime
auth_id = row[0].strip()
card_id = row[1].strip()
car_type = row[2].strip()
owner = row[3].strip()
phone = encrypt_number(row[4].strip())
start_datetime = row[5].strip()
expire_datetime = row[6].strip()
data.append((auth_id, card_id, car_type, owner, phone,
start_datetime, expire_datetime, now_datetime_second()))
table_handler.executemany(
f"""
INSERT OR IGNORE INTO cars_card
(auth_id, card_id, car_type, owner, phone, start_datetime, expire_datetime, update_datetime)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", data
)
# cars_auth 授权关联信息表
table_handler.query("SELECT name FROM sqlite_master WHERE type='table' AND name='cars_auth'")
if table_handler.cursor.fetchone() is None:
table_handler.execute(
f"""
CREATE TABLE cars_auth (
auth_id INTEGER,
car_id TEXT,
car_type TEXT,
registration_id TEXT,
update_datetime TEXT,
PRIMARY KEY (auth_id, car_id)
)
"""
)
init_config_path = os.path.join(os.path.dirname(os.path.abspath("__file__")), "data/InitialData/cars_auth.csv")
if os.path.exists(init_config_path):
with (open(init_config_path, newline='', encoding='utf8') as csvfile):
csvreader = csv.reader(csvfile)
head = next(csvreader)
data = []
if len(head) == 3:
for row in csvreader:
# auth_id,car_id,car_type
auth_id = row[0].strip()
car_id = row[1].strip()
car_type = row[2].strip()
registration_id = str(int(time.time() * (10 ** 7)))
data.append((auth_id, car_id, car_type, registration_id, now_datetime_second()))
table_handler.executemany(
f"""
INSERT OR IGNORE INTO cars_auth
(auth_id, car_id, car_type, registration_id, update_datetime)
VALUES (?, ?, ?, ?, ?)
""", data
)
# 移除 cars_card & cars_auth 表中过期的授权记录
now_datetime = now_datetime_second()[:-3]
table_handler.query(
"""
SELECT auth_id FROM cars_card
WHERE expire_datetime < ?
""", (now_datetime,)
)
res = table_handler.cursor.fetchall()
if res:
for i in res:
table_handler.execute("DELETE FROM cars_card WHERE auth_id = ?", (i[0],))
table_handler.execute("DELETE FROM cars_auth WHERE auth_id = ?", (i[0],))
@staticmethod
def get_parkinglots_id(table_handler: BaseTable, number: str, _type: str):
"""根据编码和_type查询停车场信息中的空间ID"""
table_handler.query("SELECT id from parkinglots WHERE number = ? AND type = ?", (number, _type))
res = table_handler.cursor.fetchall()
if res:
return res[0][0]
else:
raise InvalidException(f"{_type} 中不存在编码 {number} ")
@staticmethod
def get_auth_id_by_card_id(table_handler: BaseTable, card_id: str):
"""根据月卡ID查询车辆授权ID"""
table_handler.query("SELECT auth_id from cars_card WHERE card_id = ?", (card_id,))
res = table_handler.cursor.fetchall()
if res:
return res[0][0]
else:
raise InvalidException(f"月卡ID {card_id} 不存在 ")
@staticmethod
def get_parkinglots_info(table_handler: BaseTable):
table_handler.query(
"""
SELECT id, name, factory_name
FROM parkinglots
WHERE type = 'parkinglot' AND running_status is not null
"""
)
res = table_handler.cursor.fetchall()
if res:
info = []
for i in res:
info.append({
"number": i[0],
"name": i[1],
"factory_name": i[2] if i[2] else ""
})
return {"info": info}
else:
return {"info": []}
@staticmethod
def get_showed_parkinglot_area_info(table_handler: BaseTable):
table_handler.query(
"""
SELECT area_number, name, parkinglot_name, '逻辑区域' as area_type, pc.channel_count, pc.channel_name
FROM (SELECT id area_id, number area_number, name, parkinglot_name
FROM parkinglots pa
LEFT JOIN (SELECT id parkinglot_number, name parkinglot_name
FROM parkinglots
WHERE type = 'parkinglot' AND running_status IS NOT NULL)
ON parkinglot_number = pa.parkinglot_id
WHERE type = 'area' AND parkinglot_number IS NOT NULL) pa
LEFT JOIN (SELECT area_id, count(id) channel_count, GROUP_CONCAT(name) channel_name
FROM parkinglots
WHERE type = 'channel'
GROUP BY area_id) pc ON pc.area_id = pa.area_id
"""
)
res = table_handler.cursor.fetchall()
if res:
info = []
for i in res:
info.append({
"area_number": i[0],
"area_name": i[1],
"parkinglot_name": i[2],
"area_type": i[3],
"channel_count": i[4],
"channel_name": i[5]
})
return {"info": info}
else:
return {"info": []}
@staticmethod
def get_showed_parkinglot_channel_info(table_handler: BaseTable):
table_handler.query(
"""
SELECT number, name, channel_type, parkinglot_name
FROM parkinglots pa
LEFT JOIN (SELECT id parkinglot_number, name parkinglot_name
FROM parkinglots
WHERE type = 'parkinglot' AND running_status IS NOT NULL)
ON parkinglot_number = pa.parkinglot_id
WHERE type = 'channel' AND parkinglot_number IS NOT NULL
"""
)
res = table_handler.cursor.fetchall()
if res:
info = []
for i in res:
info.append({
"channel_number": i[0],
"channel_name": i[1],
"channel_type": i[2],
"parkinglot_name": i[3]
})
return {"info": info}
else:
return {"info": []}
@staticmethod
def get_showed_parkinglot_gate_info(table_handler: BaseTable):
table_handler.query(
"""
SELECT number, name, is_online, gate_status, running_status, parkinglot_name, channel_name
FROM parkinglots pa
LEFT JOIN (SELECT id parkinglot_number, name parkinglot_name
FROM parkinglots
WHERE type = 'parkinglot' AND running_status IS NOT NULL)
ON parkinglot_number = pa.parkinglot_id
LEFT JOIN (SELECT id channel_id, name channel_name
FROM parkinglots
WHERE type = 'channel') pc
ON pc.channel_id = pa.channel_id
WHERE type = 'gate' AND parkinglot_number IS NOT NULL
"""
)
res = table_handler.cursor.fetchall()
info = []
if res:
for i in res:
info.append({
"gate_number": i[0],
"gate_name": i[1],
"is_online": i[2],
"gate_status": i[3],
"running_status": i[4],
"parkinglot_name": i[5],
"channel_name": i[6] if i[6] else ""
})
return {"info": info}
@staticmethod
def get_cars_info(table_handler: BaseTable, search_type: Optional[str], search_key: Optional[str],
page: Optional[int] = None, limit: Optional[int] = None):
sub_sql_list = []
if search_type is not None:
if search_type == "owner":
sub_sql_list.append(f"owner like '%{search_key}%'")
elif search_type == "car_id":
sub_sql_list.append(f"car_ids like '%{search_key}%'")
else:
sub_sql_list.append(f"{search_type} = '{search_key}'")
if len(sub_sql_list) > 0:
sub_sql = "WHERE " + " AND ".join(sub_sql_list)
else:
sub_sql = ""
if page is not None and limit is not None:
page_sql = f"LIMIT {limit} OFFSET {(page - 1) * limit}"
else:
page_sql = ""
query_sql = f"""
SELECT cars_card.auth_id, car_ids, car_type, owner, phone, start_datetime, expire_datetime
FROM cars_card
LEFT JOIN (SELECT auth_id, GROUP_CONCAT(car_id) car_ids
FROM cars_auth GROUP BY auth_id) ca
ON ca.auth_id = cars_card.auth_id {sub_sql}
"""
table_handler.query(f"SELECT COUNT(*) FROM ({query_sql})")
total = table_handler.cursor.fetchall()[0][0]
table_handler.query(f"{query_sql} {page_sql}")
res = table_handler.cursor.fetchall()
info = []
if res:
for i in res:
info.append({
"auth_id": i[0],
"car_ids": i[1],
"car_type": i[2],
"owner_name": i[3],
"owner_phone": decrypt_number(i[4]),
"start_datetime": i[5],
"expire_datetime": i[6]
})
return {"info": info, "total": total}
@staticmethod
def get_auth_info(table_handler: BaseTable, auth_id: int):
table_handler.query(
"""
SELECT cars_card.auth_id, car_type, owner, phone, start_datetime, expire_datetime,
car_ids, associated_parking_space
FROM cars_card
LEFT JOIN (SELECT auth_id, GROUP_CONCAT(car_id) car_ids FROM cars_auth GROUP BY auth_id) ca
ON ca.auth_id = cars_card.auth_id
WHERE cars_card.auth_id = ?
""", (auth_id,)
)
res = table_handler.cursor.fetchall()
if res:
car_ids = [i for i in res[0][6].split(',')]
return {
"auth_id": res[0][0],
"car_type": res[0][1],
"owner_name": res[0][2],
"owner_phone": decrypt_number(res[0][3]),
"start_datetime": res[0][4],
"expire_datetime": res[0][5],
"car_ids": car_ids,
"car_count": len(car_ids),
"associated_parking_space": res[0][7] if res[0][7] else ''
}
else:
raise InvalidException("授权ID不存在")
@staticmethod
def get_car_id_auth_status(table_handler: BaseTable, car_id: str):
now_datetime = now_datetime_second()[:-3]
table_handler.query(
"""
SELECT cars_auth.auth_id, car_id, start_datetime, expire_datetime
FROM cars_auth
LEFT JOIN cars_card ON cars_card.auth_id = cars_auth.auth_id
WHERE car_id = ? AND expire_datetime > ?
""", (car_id, now_datetime)
)
res = table_handler.cursor.fetchall()
if res:
return {"auth_status": True}
else:
return {"auth_status": False}
@staticmethod
def get_auth_info_by_auth_id(table_handler: BaseTable, auth_id):
table_handler.query(
"""
SELECT
(SELECT card_id FROM cars_card WHERE auth_id = ?) AS card_id,
(SELECT owner FROM cars_card WHERE auth_id = ?) AS owner_name,
(SELECT phone FROM cars_card WHERE auth_id = ?) AS owner_phone,
(SELECT car_type FROM cars_card WHERE auth_id = ?) AS car_type,
(SELECT GROUP_CONCAT(device_id) FROM devices_auth WHERE _id = ? AND record_type = '车行') AS device_ids,
(SELECT GROUP_CONCAT(car_id) FROM cars_auth WHERE auth_id = ?) AS car_ids,
(SELECT GROUP_CONCAT(cars_auth.registration_id) FROM cars_auth WHERE auth_id = ?) AS reg_ids
""", (auth_id, auth_id, auth_id, auth_id, auth_id, auth_id, auth_id)
)
res = table_handler.cursor.fetchall()
card_id, owner_name, owner_phone, car_type = None, None, None, None
device_ids = []
car_ids = []
reg_ids = []
if res:
if res[0][0] is not None:
card_id = res[0][0]
if res[0][1] is not None:
owner_name = res[0][1]
if res[0][2] is not None:
owner_phone = decrypt_number(res[0][2])
if res[0][3] is not None:
car_type = res[0][3]
if res[0][4] is not None:
device_ids = [i for i in res[0][4].split(',')]
if res[0][5] is not None:
car_ids = [i for i in res[0][5].split(',')]
if res[0][6] is not None:
reg_ids = [i for i in res[0][6].split(',')]
return card_id, owner_name, owner_phone, car_type, device_ids, car_ids, reg_ids
@staticmethod
def get_card_id_by_auth_id(table_handler: BaseTable, auth_id: int):
table_handler.query("SELECT card_id FROM cars_card WHERE auth_id = ?", (auth_id,))
res = table_handler.cursor.fetchall()
if res:
return res[0][0]
else:
raise InvalidException(f"授权ID {auth_id} 不存在")
@staticmethod
def get_car_card_interval(table_handler: BaseTable, card_id: str):
"""获取对应月卡的有效期间"""
table_handler.query("SELECT start_datetime, expire_datetime FROM cars_card WHERE card_id = ?", (card_id,))
res = table_handler.cursor.fetchall()
if res:
return res[0][0], res[0][1]
else:
raise InvalidException(f"月卡ID {card_id} 未查询到对应有效期记录")
@staticmethod
def get_auth_id_interval(table_handler: BaseTable, auth_id: int):
"""获取对应月卡的有效期间"""
table_handler.query("SELECT start_datetime, expire_datetime FROM cars_card WHERE auth_id = ?", (auth_id,))
res = table_handler.cursor.fetchall()
if res:
return res[0][0], res[0][1]
else:
raise InvalidException(f"授权ID {auth_id} 未查询到对应有效期记录")
@staticmethod
def get_car_reg_id(table_handler: BaseTable, car_id: str):
table_handler.query("SELECT registration_id FROM cars_auth WHERE car_id = ?", (car_id,))
res = table_handler.cursor.fetchall()
if res:
return res[0][0]
else:
raise InvalidException(f"Car ID {car_id} 未查询到对应注册ID")
@staticmethod
def add_update_cars_auth_record(table_handler: BaseTable, auth_id: int, car_ids: list, reg_ids: list):
cars_auth_data = []
for i, car_id in enumerate(car_ids):
car_type = "新能源" if len(car_id) == 7 else "普通"
nt = now_datetime_second()
cars_auth_data.append((auth_id, car_id, car_type, reg_ids[i], nt, reg_ids[i], nt))
table_handler.executemany(
"""
INSERT INTO cars_auth
(auth_id, car_id, car_type, registration_id, update_datetime)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT (auth_id, car_id)
DO UPDATE SET registration_id = ?, update_datetime = ?
""", cars_auth_data
)
@staticmethod
def add_car_card(table_handler: BaseTable, card_id: str, device_ids: list, reg_ids: list,
obj: AddCarsCard | UpdateCarsCard):
"""增加车辆授权的月卡信息,插入到 cars_card & cars_auth & devices_auth"""
table_handler.execute(
"""
INSERT INTO cars_card
(card_id, car_type, owner, phone, associated_parking_space,
start_datetime, expire_datetime, update_datetime)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (card_id, obj.car_type, obj.owner_name, encrypt_number(obj.owner_phone), obj.associated_parking_space,
obj.start_datetime, obj.expire_datetime, now_datetime_second())
)
auth_id = ParkinglotsTable.get_auth_id_by_card_id(table_handler, card_id)
# cars_auth
ParkinglotsTable.add_update_cars_auth_record(table_handler, auth_id, obj.car_ids, reg_ids)
# devices_auth
st = obj.start_datetime
et = obj.expire_datetime
for device_id in device_ids:
DevicesTable.add_update_device_auth(table_handler, device_id, [auth_id], "车行", st, et)
@staticmethod
def add_car_card_auth(table_handler: BaseTable, obj: AddCarsCard):
"""注册车辆 & 新建月卡 & 保留记录"""
# 1. 获取车场系统 device_id
device_ids = DevicesTable.get_device_ids(table_handler, filter_name="停车")
if len(device_ids) > 0:
sc = ServicesCall()
vri = VehicleRegistrationItem(
car_plate_no="",
registration_id="",
owner_id=obj.owner_phone,
owner_name=obj.owner_name,
registration_time=now_tz_datetime(),
begin_time=obj.start_datetime,
action="insert",
registration_type='0' if "业主" in obj.car_type else '1'
)
base_reg_id = int(time.time() * (10 ** 7))
reg_ids = [str(base_reg_id + 1) for _ in obj.car_ids] # 不同设备、相同车牌 相同注册ID
success_device_ids_0 = []
for device_index, device_id in enumerate(device_ids):
# 2. 注册车辆,只要存在失败的设备,就对所有之前成功的设备进行回滚
for car_id_index, car_id in enumerate(obj.car_ids):
vri.car_plate_no = car_id
vri.registration_id = reg_ids[car_id_index]
_callback, code, msg = sc.vehicle_registration(device_id, vri)
if not _callback and "UNIQUE" not in msg:
# 执行回滚
if len(success_device_ids_0) > 1:
for success_device_id in success_device_ids_0:
for j, _id in enumerate(obj.car_ids):
vri.car_plate_no = _id
vri.registration_id = reg_ids[j]
vri.action = "delete"
sc.vehicle_registration(success_device_id, vri)
raise InvalidException(f"车辆注册失败,设备:{device_id},错误码:{code}{msg},已执行回滚")
else:
if "UNIQUE" in msg:
vri.action = "update"
_callback, code, msg = sc.vehicle_registration(device_id, vri)
success_device_ids_0.append(device_id)
# 3. 新建月卡
multi_car_id = '|'.join(obj.car_ids)
t = str(int(time.time() * 1000))
card_id = t[:8] + '-' + t[8:12] + '-' + t[12:] + "000-0000-000000000000" # 不同设备、相同车牌 相同月卡ID
ufci = UpdateFixedCardItem(
vehicle_owner_id=obj.owner_phone,
vehicle_owner_name=obj.owner_name,
vehicle_owner_phone=obj.owner_phone,
plate_number=multi_car_id,
effective_date_begin=obj.start_datetime,
effective_date_end=obj.expire_datetime,
action="insert",
card_id=card_id
)
success_device_ids_1 = []
for device_index, device_id in enumerate(device_ids):
_callback, code, msg = sc.update_fixed_card(device_id, ufci)
if not _callback:
# 月卡新建回滚
if len(success_device_ids_1) > 1:
for success_device_id in success_device_ids_1:
ufci.action = "delete"
sc.update_fixed_card(success_device_id, ufci)
# 车辆注册回滚
if len(success_device_ids_0) > 1:
for success_device_id in success_device_ids_0:
for j, _id in enumerate(obj.car_ids):
vri.car_plate_no = _id
vri.registration_id = reg_ids[j]
vri.action = "delete"
sc.vehicle_registration(success_device_id, vri)
raise InvalidException(f"车辆新建月卡失败,设备:{device_id},错误码:{code},已执行回滚")
else:
success_device_ids_1.append(device_id)
# 4. 记录授权 cars_card & cars_auth & devices_auth
ParkinglotsTable.add_car_card(table_handler, card_id, device_ids, reg_ids, obj)
return {"status": True}
@staticmethod
def delete_car_auth_id(table_handler: BaseTable, auth_id: int):
"""移除授权ID下的基本信息及关联车辆授权"""
# 1. 确认查询授权ID有效
if ParkinglotsTable.exists_auth_id(table_handler, auth_id):
# 2. 获取相关授权设备ID & 车辆列表,在设备上进行 月卡删除 & 车辆删除(→_→暂不处理)
card_id, owner_name, owner_phone, car_type, device_ids, car_ids, _ = \
ParkinglotsTable.get_auth_info_by_auth_id(table_handler, auth_id)
start_date, expire_date = ParkinglotsTable.get_car_card_interval(table_handler, card_id)
if len(device_ids) > 0:
success_ids = []
fail_ids = []
fail_code = []
fail_msg = []
sc = ServicesCall()
update_card_item = UpdateFixedCardItem(
vehicle_owner_id=owner_phone,
vehicle_owner_name=owner_name,
vehicle_owner_phone=owner_phone,
plate_number="",
effective_date_begin=start_date,
effective_date_end=expire_date,
action="delete",
card_id=card_id
)
for device_id in device_ids:
update_card_item.plate_number = '|'.join(car_ids)
# 3. 设备移除授权
_callback, code, msg = sc.update_fixed_card(device_id, update_card_item)
if not _callback:
fail_ids.append(device_id)
fail_code.append(code)
fail_msg.append(msg)
else:
success_ids.append(device_id)
# 4. 删除授权ID cars_card & cars_auth & devices_auth 记录
if len(success_ids) == len(device_ids): # 全部设备执行成功
sqls = [
"DELETE FROM cars_card WHERE auth_id = ?",
"DELETE FROM cars_auth WHERE auth_id = ?",
"DELETE FROM devices_auth WHERE _id = ? AND record_type = '车行'",
]
params = [(auth_id,), (auth_id,), (auth_id,)]
table_handler.execute(sqls, params)
return {"status": True}
elif len(success_ids) > 0:
sqls = [
"DELETE FROM cars_card WHERE auth_id = ?",
"DELETE FROM cars_auth WHERE auth_id = ?"
]
_sql = "DELETE FROM devices_auth WHERE _id = ? AND device_id = ? AND record_type = '车行'"
sqls += [_sql for _ in range(len(success_ids))]
params = [(auth_id,), (auth_id,)]
params += [(auth_id, success_id) for success_id in success_ids]
table_handler.execute(sqls, params)
raise InvalidException(f"部分授权移除失败设备ID {fail_ids}, 错误码: {fail_code}, {fail_msg}")
else:
raise InvalidException(f"授权移除失败设备ID {fail_ids}, 错误码: {fail_code}, {fail_msg}")
else:
raise InvalidException(f"授权ID {auth_id} 无效")
@staticmethod
def delete_car_auth(table_handler: BaseTable, delete_car_list):
"""删除车辆授权的月卡信息cars_auth"""
params = []
for car_id in delete_car_list:
params.append((car_id,))
table_handler.executemany("DELETE FROM cars_auth WHERE car_id = ?", params)
@staticmethod
def update_car_card(table_handler: BaseTable, auth_id: int, car_type: str, owner_name: str, owner_phone: str,
associated_parking_space: str, start_datetime: str, expire_datetime: str):
"""更新车辆授权的月卡信息cars_card & cars_auth & devices_auth"""
table_handler.execute(
"""
UPDATE cars_card SET
car_type = ?, owner = ?, phone = ?, associated_parking_space = ?,
start_datetime = ?, expire_datetime = ?, update_datetime = ?
WHERE auth_id = ?
""", (car_type, owner_name, encrypt_number(owner_phone), associated_parking_space,
start_datetime, expire_datetime, now_datetime_second(), auth_id)
)
@staticmethod
def update_car_auth(table_handler: BaseTable, device_ids: list, reg_ids: list, obj: UpdateCarsCard):
"""更新车辆授权的车辆信息和设备授权日期cars_auth & devices_auth"""
# cars_auth
ParkinglotsTable.add_update_cars_auth_record(table_handler, obj.auth_id, obj.car_ids, reg_ids)
# devices_auth
st = obj.start_datetime
et = obj.expire_datetime
for device_id in device_ids:
DevicesTable.add_update_device_auth(table_handler, device_id, [str(obj.auth_id)], "车行", st, et)
@staticmethod
def update_parkinglot_show(table_handler: BaseTable,
_id: str, name: Optional[str], factory_name: Optional[str], is_show: bool):
"""更新停车场显示状态"""
if not ParkinglotsTable.exists_number(table_handler, _id, 'parkinglot'):
raise InvalidException(f"不存在编码 {_id} ")
status = 'show' if is_show else None
sub_sql_list = []
if name is not None:
sub_sql_list.append(f"name = '{name}'")
if factory_name is not None:
sub_sql_list.append(f"factory_name = '{factory_name}'")
if len(sub_sql_list) > 0:
sub_sql = ', '.join(sub_sql_list) + ', '
else:
sub_sql = ''
table_handler.execute(
f"""
UPDATE parkinglots
SET {sub_sql}running_status = ?, update_datetime = ?
WHERE type = 'parkinglot' AND id = ?
""", (status, now_datetime_second(), _id)
)
return {"status": True}
@staticmethod
def update_car_card_auth(table_handler: BaseTable, obj: UpdateCarsCard):
"""更新车辆 & 更新月卡 & 保留记录"""
# 1. 获取车场系统 device_id
device_ids = DevicesTable.get_device_ids(table_handler, filter_name="停车")
if len(device_ids) > 0:
old_st, old_et = ParkinglotsTable.get_auth_id_interval(table_handler, obj.auth_id)
registration_time = millisecond_timestamp2tz(
ParkinglotsTable.get_car_reg_id(table_handler, str(obj.car_ids[0]))[:13])
sc = ServicesCall()
vri = VehicleRegistrationItem(
car_plate_no="",
registration_id="",
owner_id=obj.owner_phone,
owner_name=obj.owner_name,
registration_time=registration_time,
begin_time=obj.start_datetime,
action="",
registration_type='0' if "业主" in obj.car_type else '1'
)
# 2. 查询当前授权情况,获取 已授权车辆列表、待授权车辆列表、待移除车辆列表、待更新车辆列表
card_id, owner_name, owner_phone, car_type, device_ids, authed_car_list, authed_reg_ids = \
ParkinglotsTable.get_auth_info_by_auth_id(table_handler, obj.auth_id)
insert_car_list = obj.car_ids.copy()
delete_car_list = []
update_car_list = []
if (obj.start_datetime < now_datetime_second()[:-3] or obj.start_datetime == old_st) and \
obj.expire_datetime == old_et and obj.owner_phone == owner_phone and \
obj.owner_name == owner_name:
# 若有效时间+人名+手机号码未变更则不变的车辆不需更新,只考虑新增的车辆、移除的车辆
for car_id in obj.car_ids:
if car_id in authed_car_list:
insert_car_list.remove(car_id)
for car_id in authed_car_list:
if car_id not in obj.car_ids:
delete_car_list.append(car_id)
else:
for car_id in obj.car_ids:
if car_id in authed_car_list:
insert_car_list.remove(car_id)
update_car_list.append(car_id)
for car_id in authed_car_list:
if car_id not in obj.car_ids:
delete_car_list.append(car_id)
base_reg_id = int(time.time() * (10 ** 7))
reg_ids = [str(base_reg_id + 1) for _ in insert_car_list] # 不同设备、相同车牌 相同注册ID
success_insert_vri = []
success_delete_vri = []
success_update_origin_vri = []
fail_callback = False
for device_index, device_id in enumerate(device_ids):
# 3. 注册车辆 & 删除车辆 & 更新车辆
for car_id_index, car_id in enumerate(insert_car_list):
vri.car_plate_no = car_id
vri.registration_id = reg_ids[car_id_index]
vri.action = "insert"
_callback, code, msg = sc.vehicle_registration(device_id, vri)
if _callback or "UNIQUE" not in msg:
if not _callback and "UNIQUE" not in msg:
update_car_list.append(car_id)
else:
if "UNIQUE" in msg:
vri.action = "update"
_callback, _, _ = sc.vehicle_registration(device_id, vri)
success_insert_vri.append([device_id, vri.copy()])
else:
fail_callback = True
break
for car_id in delete_car_list:
vri.car_plate_no = car_id
vri.registration_id = ParkinglotsTable.get_car_reg_id(table_handler, car_id)
vri.action = "delete"
_callback, code, msg = sc.vehicle_registration(device_id, vri)
if _callback:
success_delete_vri.append([device_id, vri.copy()])
else:
fail_callback = True
break
for car_id in update_car_list:
vri.car_plate_no = car_id
vri.registration_id = ParkinglotsTable.get_car_reg_id(table_handler, car_id)
vri.action = "update"
_callback, code, msg = sc.vehicle_registration(device_id, vri)
if _callback:
vri.owner_id = owner_phone
vri.owner_name = owner_name
vri.begin_time = old_st
vri.registration_time = registration_time
vri.registration_type = car_type
success_update_origin_vri.append([device_id, vri.copy()])
else:
fail_callback = True
break
# TODO 回滚逻辑待校验
if fail_callback: # 只要存在失败的设备,就对所有之前成功的设备进行回滚
for insert_item in success_insert_vri:
insert_item[1].action = "delete"
_callback, _, _ = sc.vehicle_registration(insert_item[0], insert_item[1])
for delete_item in success_delete_vri:
delete_item[1].action = "insert"
_callback, _, _ = sc.vehicle_registration(delete_item[0], delete_item[1])
for update_item in success_update_origin_vri:
_callback, _, _ = sc.vehicle_registration(update_item[0], update_item[1])
# 3. 更新月卡
multi_car_id = '|'.join(obj.car_ids)
card_id = ParkinglotsTable.get_card_id_by_auth_id(table_handler, obj.auth_id)
ufci = UpdateFixedCardItem(
vehicle_owner_id=obj.owner_phone,
vehicle_owner_name=obj.owner_name,
vehicle_owner_phone=obj.owner_phone,
plate_number=multi_car_id,
effective_date_begin=obj.start_datetime,
effective_date_end=obj.expire_datetime,
action="update",
card_id=card_id
)
success_device_ids_1 = []
for device_id in device_ids:
_callback, code, msg = sc.update_fixed_card(device_id, ufci)
if not _callback:
raise InvalidException(f"车辆更新月卡失败,设备:{device_id},错误码:{code}{msg}")
else:
success_device_ids_1.append(device_id)
# 4. 记录授权 cars_card & cars_auth & devices_auth
ParkinglotsTable.update_car_card(table_handler, obj.auth_id, obj.car_type,
obj.owner_name, obj.owner_phone, obj.associated_parking_space,
obj.start_datetime, obj.expire_datetime)
if insert_car_list:
obj.car_ids = insert_car_list
ParkinglotsTable.update_car_auth(table_handler, device_ids, reg_ids, obj)
if delete_car_list:
ParkinglotsTable.delete_car_auth(table_handler, delete_car_list)
return {"status": True}
@staticmethod
def is_number_added(table_handler: BaseTable, number: str, _type: str):
"""判断停车场编号是否已添加显示"""
number_col = "number" if _type != "parkinglot" else "id"
table_handler.query(
f"SELECT id FROM parkinglots WHERE {number_col} = ? and type = ? and running_status is not null",
(number, _type))
res = table_handler.cursor.fetchall()
if res:
return True
else:
return False
@staticmethod
def exists(table_handler: BaseTable, _id: str, _type: str):
table_handler.query(f"SELECT id FROM parkinglots WHERE id = ? and type = ?", (_id, _type))
res = table_handler.cursor.fetchall()
if res:
return True
else:
return False
@staticmethod
def exists_number(table_handler: BaseTable, number: str, _type: str):
number_col = "number" if _type != "parkinglot" else "id"
table_handler.query(f"SELECT id FROM parkinglots WHERE {number_col} = ? and type = ?",
(number, _type))
res = table_handler.cursor.fetchall()
if res:
return True
else:
return False
@staticmethod
def exists_phone(table_handler: BaseTable, phone: str):
table_handler.query(f"SELECT auth_id FROM cars_card WHERE phone = ?",
(encrypt_number(phone),))
res = table_handler.cursor.fetchall()
if res:
return True
else:
return False
@staticmethod
def exists_auth_id(table_handler: BaseTable, auth_id: int):
table_handler.query(f"SELECT auth_id FROM cars_card WHERE auth_id = ?",
(auth_id,))
res = table_handler.cursor.fetchall()
if res:
return True
else:
return False