1020 lines
46 KiB
Python
1020 lines
46 KiB
Python
# -*- coding:utf-8 -*-
|
||
"""
|
||
@Author : xuxingchen
|
||
@Contact : xuxingchen@sinochem.com
|
||
@Desc : 记录车场信息,在初始化时会初始化csv进行覆盖
|
||
"""
|
||
import csv
|
||
import os
|
||
import time
|
||
from typing import Optional, List
|
||
import re
|
||
|
||
from pydantic import BaseModel, field_validator, Field
|
||
|
||
from device.call import ServicesCall, UpdateFixedCardItem, VehicleRegistrationItem
|
||
from models.devices import DevicesTable
|
||
from utils.database import BaseTable, get_table_handler
|
||
from utils.misc import (now_datetime_second, InvalidException, now_tz_datetime, millisecond_timestamp2tz,
|
||
encrypt_number, decrypt_number)
|
||
|
||
|
||
class AddParkingLot(BaseModel):
|
||
parkinglot_number: str = Field(description="车场编号")
|
||
parkinglot_name: str = Field(description="车场名称")
|
||
factory_name: Optional[str] = Field(None, description="供应商")
|
||
|
||
@field_validator("parkinglot_number")
|
||
def check_parkinglot_number(cls, value):
|
||
th = get_table_handler()
|
||
if not ParkinglotsTable.exists(th, value, 'parkinglot'):
|
||
raise InvalidException("停车场编号无效")
|
||
if ParkinglotsTable.is_number_added(th, value, 'parkinglot'):
|
||
raise InvalidException("停车场编号已添加")
|
||
return value
|
||
|
||
|
||
class AddCarsCard(BaseModel):
|
||
car_type: str
|
||
owner_name: str
|
||
owner_phone: str
|
||
start_datetime: str
|
||
expire_datetime: str
|
||
car_ids: List[str]
|
||
associated_parking_space: str = ""
|
||
|
||
@field_validator("car_type")
|
||
def check_car_type(cls, value):
|
||
types = ["业主车辆", "员工车辆", "访客车辆", "其他车辆"]
|
||
if value not in types:
|
||
raise InvalidException(f"请提供正确的车辆类型:{types}")
|
||
return value
|
||
|
||
@field_validator("owner_phone")
|
||
def check_owner_phone(cls, value):
|
||
pattern = re.compile(r'^(13[0-9]|14[01456879]|15[0-35-9]|16[2567]|17[0-8]|18[0-9]|19[0-35-9])\d{8}$')
|
||
if pattern.search(value) is None:
|
||
raise InvalidException("请提供正确的手机号码")
|
||
th = get_table_handler()
|
||
if not ParkinglotsTable.exists_phone(th, value):
|
||
return value
|
||
else:
|
||
raise InvalidException(f"车辆所有人电话号码已存在")
|
||
|
||
@field_validator("car_ids")
|
||
def check_car_ids(cls, value):
|
||
th = get_table_handler()
|
||
for car_id in value:
|
||
if ParkinglotsTable.get_car_id_auth_status(th, car_id)["auth_status"]:
|
||
raise InvalidException(f"车牌号 {car_id} 已授权,请关联未授权的车牌号")
|
||
return value
|
||
|
||
|
||
class UpdateCarsCard(BaseModel):
|
||
auth_id: int
|
||
car_type: str
|
||
owner_name: str
|
||
owner_phone: str
|
||
start_datetime: str
|
||
expire_datetime: str
|
||
car_ids: List[str]
|
||
associated_parking_space: str
|
||
|
||
@field_validator("auth_id")
|
||
def check_auth_id(cls, value):
|
||
th = get_table_handler()
|
||
if not ParkinglotsTable.exists_auth_id(th, value):
|
||
raise InvalidException(f"授权ID {value} 不存在")
|
||
else:
|
||
return value
|
||
|
||
@field_validator("car_type")
|
||
def check_car_type(cls, value):
|
||
types = ["业主车辆", "员工车辆", "访客车辆", "其他车辆"]
|
||
if value not in types:
|
||
raise InvalidException(f"请提供正确的车辆类型:{types}")
|
||
return value
|
||
|
||
@field_validator("start_datetime")
|
||
def check_datetime(cls, value):
|
||
if "T" in value and "Z" in value:
|
||
return value
|
||
pattern = r'^\d{4}-\d{2}-\d{2} \d{2}:\d{2}$'
|
||
if re.match(pattern, value):
|
||
return value
|
||
else:
|
||
return InvalidException(f"请提供正确的时间格式:YYYY-MM-DD HH:MM")
|
||
|
||
@field_validator("owner_phone")
|
||
def check_owner_phone(cls, value):
|
||
pattern = re.compile(r'^(13[0-9]|14[01456879]|15[0-35-9]|16[2567]|17[0-8]|18[0-9]|19[0-35-9])\d{8}$')
|
||
if pattern.search(value) is None:
|
||
raise InvalidException("请提供正确的手机号码")
|
||
return value
|
||
|
||
|
||
class GetCarsInfo(BaseModel):
|
||
search_type: str
|
||
search_key: str
|
||
page: Optional[int] = None
|
||
limit: Optional[int] = None
|
||
|
||
@field_validator("search_type")
|
||
def check_search_type(cls, value):
|
||
types = {
|
||
"车牌号码": "car_id",
|
||
"车辆所有人": "owner",
|
||
"联系电话": "phone"
|
||
}
|
||
if value not in types.keys():
|
||
raise InvalidException(f"请提供正确的搜索类型 {list(types.keys())}")
|
||
else:
|
||
return types[value]
|
||
|
||
|
||
class ParkinglotsTable(BaseTable):
|
||
@staticmethod
|
||
def check(table_handler: BaseTable):
|
||
"""检测是否存在当前表"""
|
||
# parkinglots 车场信息表
|
||
init_config_path = os.path.join(os.path.dirname(os.path.abspath("__file__")),
|
||
"data/InitialData/parkinglots.csv")
|
||
table_handler.query("SELECT name FROM sqlite_master WHERE type='table' AND name='parkinglots'")
|
||
if table_handler.cursor.fetchone() is None:
|
||
table_handler.execute(
|
||
f"""
|
||
CREATE TABLE parkinglots (
|
||
id TEXT,
|
||
number TEXT,
|
||
name TEXT,
|
||
type TEXT,
|
||
parkinglot_id TEXT,
|
||
area_id TEXT,
|
||
channel_id TEXT,
|
||
factory_name TEXT,
|
||
channel_type TEXT,
|
||
is_online TEXT,
|
||
gate_status TEXT,
|
||
running_status TEXT,
|
||
update_datetime TEXT,
|
||
PRIMARY KEY (id, type)
|
||
)
|
||
"""
|
||
)
|
||
if os.path.exists(init_config_path):
|
||
with open(init_config_path, newline='', encoding='utf8') as csvfile:
|
||
csvreader = csv.reader(csvfile)
|
||
head = next(csvreader)
|
||
data = []
|
||
if len(head) == 12:
|
||
for row in csvreader:
|
||
# id,number,name,type,area_id,channel_id,factory_name,channel_type,is_online,gate_status,running_status
|
||
_id = row[0].strip()
|
||
number = row[1].strip() if row[1].strip() else None
|
||
name = row[2].strip()
|
||
_type = row[3].strip()
|
||
parkinglot_id = row[4].strip()
|
||
area_id = row[5].strip() if row[5].strip() else None
|
||
channel_id = row[6].strip() if row[6].strip() else None
|
||
factory_name = row[7].strip() if row[7].strip() else None
|
||
channel_type = row[8].strip() if row[8].strip() else None
|
||
is_online = row[9].strip() if row[9].strip() else None
|
||
gate_status = row[10].strip() if row[10].strip() else None
|
||
running_status = row[11].strip() if row[11].strip() else None
|
||
data.append((_id, number, name, _type, parkinglot_id, area_id, channel_id, factory_name,
|
||
channel_type, is_online, gate_status, running_status, now_datetime_second()))
|
||
table_handler.executemany(
|
||
f"""
|
||
INSERT OR IGNORE INTO parkinglots
|
||
(id, number, name, type, parkinglot_id, area_id, channel_id, factory_name, channel_type,
|
||
is_online, gate_status, running_status, update_datetime)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
""", data
|
||
)
|
||
|
||
# cars_card 车辆月卡信息表
|
||
table_handler.query("SELECT name FROM sqlite_master WHERE type='table' AND name='cars_card'")
|
||
if table_handler.cursor.fetchone() is None:
|
||
table_handler.execute(
|
||
f"""
|
||
CREATE TABLE cars_card (
|
||
auth_id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
card_id TEXT UNIQUE,
|
||
car_type TEXT,
|
||
owner TEXT,
|
||
phone TEXT UNIQUE,
|
||
start_datetime TEXT,
|
||
expire_datetime TEXT,
|
||
associated_parking_space TEXT,
|
||
update_datetime TEXT
|
||
)
|
||
"""
|
||
)
|
||
init_config_path = os.path.join(os.path.dirname(os.path.abspath("__file__")), "data/InitialData/cars_card.csv")
|
||
if os.path.exists(init_config_path):
|
||
with open(init_config_path, newline='', encoding='utf8') as csvfile:
|
||
csvreader = csv.reader(csvfile)
|
||
head = next(csvreader)
|
||
data = []
|
||
if len(head) == 7:
|
||
for row in csvreader:
|
||
# auth_id,car_type,owner,phone,start_datetime,expire_datetime
|
||
auth_id = row[0].strip()
|
||
card_id = row[1].strip()
|
||
car_type = row[2].strip()
|
||
owner = row[3].strip()
|
||
phone = encrypt_number(row[4].strip())
|
||
start_datetime = row[5].strip()
|
||
expire_datetime = row[6].strip()
|
||
data.append((auth_id, card_id, car_type, owner, phone,
|
||
start_datetime, expire_datetime, now_datetime_second()))
|
||
table_handler.executemany(
|
||
f"""
|
||
INSERT OR IGNORE INTO cars_card
|
||
(auth_id, card_id, car_type, owner, phone, start_datetime, expire_datetime, update_datetime)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
||
""", data
|
||
)
|
||
|
||
# cars_auth 授权关联信息表
|
||
table_handler.query("SELECT name FROM sqlite_master WHERE type='table' AND name='cars_auth'")
|
||
if table_handler.cursor.fetchone() is None:
|
||
table_handler.execute(
|
||
f"""
|
||
CREATE TABLE cars_auth (
|
||
auth_id INTEGER,
|
||
car_id TEXT,
|
||
car_type TEXT,
|
||
registration_id TEXT,
|
||
update_datetime TEXT,
|
||
PRIMARY KEY (auth_id, car_id)
|
||
)
|
||
"""
|
||
)
|
||
init_config_path = os.path.join(os.path.dirname(os.path.abspath("__file__")), "data/InitialData/cars_auth.csv")
|
||
if os.path.exists(init_config_path):
|
||
with (open(init_config_path, newline='', encoding='utf8') as csvfile):
|
||
csvreader = csv.reader(csvfile)
|
||
head = next(csvreader)
|
||
data = []
|
||
if len(head) == 3:
|
||
for row in csvreader:
|
||
# auth_id,car_id,car_type
|
||
auth_id = row[0].strip()
|
||
car_id = row[1].strip()
|
||
car_type = row[2].strip()
|
||
registration_id = str(int(time.time() * (10 ** 7)))
|
||
data.append((auth_id, car_id, car_type, registration_id, now_datetime_second()))
|
||
table_handler.executemany(
|
||
f"""
|
||
INSERT OR IGNORE INTO cars_auth
|
||
(auth_id, car_id, car_type, registration_id, update_datetime)
|
||
VALUES (?, ?, ?, ?, ?)
|
||
""", data
|
||
)
|
||
# 移除 cars_card & cars_auth 表中过期的授权记录
|
||
now_datetime = now_datetime_second()[:-3]
|
||
table_handler.query(
|
||
"""
|
||
SELECT auth_id FROM cars_card
|
||
WHERE expire_datetime < ?
|
||
""", (now_datetime,)
|
||
)
|
||
res = table_handler.cursor.fetchall()
|
||
if res:
|
||
for i in res:
|
||
table_handler.execute("DELETE FROM cars_card WHERE auth_id = ?", (i[0],))
|
||
table_handler.execute("DELETE FROM cars_auth WHERE auth_id = ?", (i[0],))
|
||
|
||
@staticmethod
|
||
def get_parkinglots_id(table_handler: BaseTable, number: str, _type: str):
|
||
"""根据编码和_type查询停车场信息中的空间ID"""
|
||
table_handler.query("SELECT id from parkinglots WHERE number = ? AND type = ?", (number, _type))
|
||
res = table_handler.cursor.fetchall()
|
||
if res:
|
||
return res[0][0]
|
||
else:
|
||
raise InvalidException(f"{_type} 中不存在编码 {number} ")
|
||
|
||
@staticmethod
|
||
def get_auth_id_by_card_id(table_handler: BaseTable, card_id: str):
|
||
"""根据月卡ID查询车辆授权ID"""
|
||
table_handler.query("SELECT auth_id from cars_card WHERE card_id = ?", (card_id,))
|
||
res = table_handler.cursor.fetchall()
|
||
if res:
|
||
return res[0][0]
|
||
else:
|
||
raise InvalidException(f"月卡ID {card_id} 不存在 ")
|
||
|
||
@staticmethod
|
||
def get_parkinglots_info(table_handler: BaseTable):
|
||
table_handler.query(
|
||
"""
|
||
SELECT id, name, factory_name
|
||
FROM parkinglots
|
||
WHERE type = 'parkinglot' AND running_status is not null
|
||
"""
|
||
)
|
||
res = table_handler.cursor.fetchall()
|
||
if res:
|
||
info = []
|
||
for i in res:
|
||
info.append({
|
||
"number": i[0],
|
||
"name": i[1],
|
||
"factory_name": i[2] if i[2] else ""
|
||
})
|
||
return {"info": info}
|
||
else:
|
||
return {"info": []}
|
||
|
||
@staticmethod
|
||
def get_showed_parkinglot_area_info(table_handler: BaseTable):
|
||
table_handler.query(
|
||
"""
|
||
SELECT area_number, name, parkinglot_name, '逻辑区域' as area_type, pc.channel_count, pc.channel_name
|
||
FROM (SELECT id area_id, number area_number, name, parkinglot_name
|
||
FROM parkinglots pa
|
||
LEFT JOIN (SELECT id parkinglot_number, name parkinglot_name
|
||
FROM parkinglots
|
||
WHERE type = 'parkinglot' AND running_status IS NOT NULL)
|
||
ON parkinglot_number = pa.parkinglot_id
|
||
WHERE type = 'area' AND parkinglot_number IS NOT NULL) pa
|
||
LEFT JOIN (SELECT area_id, count(id) channel_count, GROUP_CONCAT(name) channel_name
|
||
FROM parkinglots
|
||
WHERE type = 'channel'
|
||
GROUP BY area_id) pc ON pc.area_id = pa.area_id
|
||
"""
|
||
)
|
||
res = table_handler.cursor.fetchall()
|
||
if res:
|
||
info = []
|
||
for i in res:
|
||
info.append({
|
||
"area_number": i[0],
|
||
"area_name": i[1],
|
||
"parkinglot_name": i[2],
|
||
"area_type": i[3],
|
||
"channel_count": i[4],
|
||
"channel_name": i[5]
|
||
})
|
||
return {"info": info}
|
||
else:
|
||
return {"info": []}
|
||
|
||
@staticmethod
|
||
def get_showed_parkinglot_channel_info(table_handler: BaseTable):
|
||
table_handler.query(
|
||
"""
|
||
SELECT number, name, channel_type, parkinglot_name
|
||
FROM parkinglots pa
|
||
LEFT JOIN (SELECT id parkinglot_number, name parkinglot_name
|
||
FROM parkinglots
|
||
WHERE type = 'parkinglot' AND running_status IS NOT NULL)
|
||
ON parkinglot_number = pa.parkinglot_id
|
||
WHERE type = 'channel' AND parkinglot_number IS NOT NULL
|
||
"""
|
||
)
|
||
res = table_handler.cursor.fetchall()
|
||
if res:
|
||
info = []
|
||
for i in res:
|
||
info.append({
|
||
"channel_number": i[0],
|
||
"channel_name": i[1],
|
||
"channel_type": i[2],
|
||
"parkinglot_name": i[3]
|
||
})
|
||
return {"info": info}
|
||
else:
|
||
return {"info": []}
|
||
|
||
@staticmethod
|
||
def get_showed_parkinglot_gate_info(table_handler: BaseTable):
|
||
table_handler.query(
|
||
"""
|
||
SELECT number, name, is_online, gate_status, running_status, parkinglot_name, channel_name
|
||
FROM parkinglots pa
|
||
LEFT JOIN (SELECT id parkinglot_number, name parkinglot_name
|
||
FROM parkinglots
|
||
WHERE type = 'parkinglot' AND running_status IS NOT NULL)
|
||
ON parkinglot_number = pa.parkinglot_id
|
||
LEFT JOIN (SELECT id channel_id, name channel_name
|
||
FROM parkinglots
|
||
WHERE type = 'channel') pc
|
||
ON pc.channel_id = pa.channel_id
|
||
WHERE type = 'gate' AND parkinglot_number IS NOT NULL
|
||
"""
|
||
)
|
||
res = table_handler.cursor.fetchall()
|
||
info = []
|
||
if res:
|
||
for i in res:
|
||
info.append({
|
||
"gate_number": i[0],
|
||
"gate_name": i[1],
|
||
"is_online": i[2],
|
||
"gate_status": i[3],
|
||
"running_status": i[4],
|
||
"parkinglot_name": i[5],
|
||
"channel_name": i[6] if i[6] else ""
|
||
})
|
||
return {"info": info}
|
||
|
||
@staticmethod
|
||
def get_cars_info(table_handler: BaseTable, search_type: Optional[str], search_key: Optional[str],
|
||
page: Optional[int] = None, limit: Optional[int] = None):
|
||
sub_sql_list = []
|
||
if search_type is not None:
|
||
if search_type == "owner":
|
||
sub_sql_list.append(f"owner like '%{search_key}%'")
|
||
elif search_type == "car_id":
|
||
sub_sql_list.append(f"car_ids like '%{search_key}%'")
|
||
else:
|
||
sub_sql_list.append(f"{search_type} = '{search_key}'")
|
||
if len(sub_sql_list) > 0:
|
||
sub_sql = "WHERE " + " AND ".join(sub_sql_list)
|
||
else:
|
||
sub_sql = ""
|
||
if page is not None and limit is not None:
|
||
page_sql = f"LIMIT {limit} OFFSET {(page - 1) * limit}"
|
||
else:
|
||
page_sql = ""
|
||
query_sql = f"""
|
||
SELECT cars_card.auth_id, car_ids, car_type, owner, phone, start_datetime, expire_datetime
|
||
FROM cars_card
|
||
LEFT JOIN (SELECT auth_id, GROUP_CONCAT(car_id) car_ids
|
||
FROM cars_auth GROUP BY auth_id) ca
|
||
ON ca.auth_id = cars_card.auth_id {sub_sql}
|
||
"""
|
||
table_handler.query(f"SELECT COUNT(*) FROM ({query_sql})")
|
||
total = table_handler.cursor.fetchall()[0][0]
|
||
table_handler.query(f"{query_sql} {page_sql}")
|
||
res = table_handler.cursor.fetchall()
|
||
info = []
|
||
if res:
|
||
for i in res:
|
||
info.append({
|
||
"auth_id": i[0],
|
||
"car_ids": i[1],
|
||
"car_type": i[2],
|
||
"owner_name": i[3],
|
||
"owner_phone": decrypt_number(i[4]),
|
||
"start_datetime": i[5],
|
||
"expire_datetime": i[6]
|
||
})
|
||
return {"info": info, "total": total}
|
||
|
||
@staticmethod
|
||
def get_auth_info(table_handler: BaseTable, auth_id: int):
|
||
table_handler.query(
|
||
"""
|
||
SELECT cars_card.auth_id, car_type, owner, phone, start_datetime, expire_datetime,
|
||
car_ids, associated_parking_space
|
||
FROM cars_card
|
||
LEFT JOIN (SELECT auth_id, GROUP_CONCAT(car_id) car_ids FROM cars_auth GROUP BY auth_id) ca
|
||
ON ca.auth_id = cars_card.auth_id
|
||
WHERE cars_card.auth_id = ?
|
||
""", (auth_id,)
|
||
)
|
||
res = table_handler.cursor.fetchall()
|
||
if res:
|
||
car_ids = [i for i in res[0][6].split(',')]
|
||
return {
|
||
"auth_id": res[0][0],
|
||
"car_type": res[0][1],
|
||
"owner_name": res[0][2],
|
||
"owner_phone": decrypt_number(res[0][3]),
|
||
"start_datetime": res[0][4],
|
||
"expire_datetime": res[0][5],
|
||
"car_ids": car_ids,
|
||
"car_count": len(car_ids),
|
||
"associated_parking_space": res[0][7] if res[0][7] else ''
|
||
}
|
||
else:
|
||
raise InvalidException("授权ID不存在")
|
||
|
||
@staticmethod
|
||
def get_car_id_auth_status(table_handler: BaseTable, car_id: str):
|
||
now_datetime = now_datetime_second()[:-3]
|
||
table_handler.query(
|
||
"""
|
||
SELECT cars_auth.auth_id, car_id, start_datetime, expire_datetime
|
||
FROM cars_auth
|
||
LEFT JOIN cars_card ON cars_card.auth_id = cars_auth.auth_id
|
||
WHERE car_id = ? AND expire_datetime > ?
|
||
""", (car_id, now_datetime)
|
||
)
|
||
res = table_handler.cursor.fetchall()
|
||
if res:
|
||
return {"auth_status": True}
|
||
else:
|
||
return {"auth_status": False}
|
||
|
||
@staticmethod
|
||
def get_auth_info_by_auth_id(table_handler: BaseTable, auth_id):
|
||
table_handler.query(
|
||
"""
|
||
SELECT
|
||
(SELECT card_id FROM cars_card WHERE auth_id = ?) AS card_id,
|
||
(SELECT owner FROM cars_card WHERE auth_id = ?) AS owner_name,
|
||
(SELECT phone FROM cars_card WHERE auth_id = ?) AS owner_phone,
|
||
(SELECT car_type FROM cars_card WHERE auth_id = ?) AS car_type,
|
||
(SELECT GROUP_CONCAT(device_id) FROM devices_auth WHERE _id = ? AND record_type = '车行') AS device_ids,
|
||
(SELECT GROUP_CONCAT(car_id) FROM cars_auth WHERE auth_id = ?) AS car_ids,
|
||
(SELECT GROUP_CONCAT(cars_auth.registration_id) FROM cars_auth WHERE auth_id = ?) AS reg_ids
|
||
""", (auth_id, auth_id, auth_id, auth_id, auth_id, auth_id, auth_id)
|
||
)
|
||
res = table_handler.cursor.fetchall()
|
||
card_id, owner_name, owner_phone, car_type = None, None, None, None
|
||
device_ids = []
|
||
car_ids = []
|
||
reg_ids = []
|
||
if res:
|
||
if res[0][0] is not None:
|
||
card_id = res[0][0]
|
||
if res[0][1] is not None:
|
||
owner_name = res[0][1]
|
||
if res[0][2] is not None:
|
||
owner_phone = decrypt_number(res[0][2])
|
||
if res[0][3] is not None:
|
||
car_type = res[0][3]
|
||
if res[0][4] is not None:
|
||
device_ids = [i for i in res[0][4].split(',')]
|
||
if res[0][5] is not None:
|
||
car_ids = [i for i in res[0][5].split(',')]
|
||
if res[0][6] is not None:
|
||
reg_ids = [i for i in res[0][6].split(',')]
|
||
return card_id, owner_name, owner_phone, car_type, device_ids, car_ids, reg_ids
|
||
|
||
@staticmethod
|
||
def get_card_id_by_auth_id(table_handler: BaseTable, auth_id: int):
|
||
table_handler.query("SELECT card_id FROM cars_card WHERE auth_id = ?", (auth_id,))
|
||
res = table_handler.cursor.fetchall()
|
||
if res:
|
||
return res[0][0]
|
||
else:
|
||
raise InvalidException(f"授权ID {auth_id} 不存在")
|
||
|
||
@staticmethod
|
||
def get_car_card_interval(table_handler: BaseTable, card_id: str):
|
||
"""获取对应月卡的有效期间"""
|
||
table_handler.query("SELECT start_datetime, expire_datetime FROM cars_card WHERE card_id = ?", (card_id,))
|
||
res = table_handler.cursor.fetchall()
|
||
if res:
|
||
return res[0][0], res[0][1]
|
||
else:
|
||
raise InvalidException(f"月卡ID {card_id} 未查询到对应有效期记录")
|
||
|
||
@staticmethod
|
||
def get_auth_id_interval(table_handler: BaseTable, auth_id: int):
|
||
"""获取对应月卡的有效期间"""
|
||
table_handler.query("SELECT start_datetime, expire_datetime FROM cars_card WHERE auth_id = ?", (auth_id,))
|
||
res = table_handler.cursor.fetchall()
|
||
if res:
|
||
return res[0][0], res[0][1]
|
||
else:
|
||
raise InvalidException(f"授权ID {auth_id} 未查询到对应有效期记录")
|
||
|
||
@staticmethod
|
||
def get_car_reg_id(table_handler: BaseTable, car_id: str):
|
||
table_handler.query("SELECT registration_id FROM cars_auth WHERE car_id = ?", (car_id,))
|
||
res = table_handler.cursor.fetchall()
|
||
if res:
|
||
return res[0][0]
|
||
else:
|
||
raise InvalidException(f"Car ID {car_id} 未查询到对应注册ID")
|
||
|
||
@staticmethod
|
||
def add_update_cars_auth_record(table_handler: BaseTable, auth_id: int, car_ids: list, reg_ids: list):
|
||
cars_auth_data = []
|
||
for i, car_id in enumerate(car_ids):
|
||
car_type = "新能源" if len(car_id) == 7 else "普通"
|
||
nt = now_datetime_second()
|
||
cars_auth_data.append((auth_id, car_id, car_type, reg_ids[i], nt, reg_ids[i], nt))
|
||
table_handler.executemany(
|
||
"""
|
||
INSERT INTO cars_auth
|
||
(auth_id, car_id, car_type, registration_id, update_datetime)
|
||
VALUES (?, ?, ?, ?, ?)
|
||
ON CONFLICT (auth_id, car_id)
|
||
DO UPDATE SET registration_id = ?, update_datetime = ?
|
||
""", cars_auth_data
|
||
)
|
||
|
||
@staticmethod
|
||
def add_car_card(table_handler: BaseTable, card_id: str, device_ids: list, reg_ids: list,
|
||
obj: AddCarsCard | UpdateCarsCard):
|
||
"""增加车辆授权的月卡信息,插入到 cars_card & cars_auth & devices_auth"""
|
||
table_handler.execute(
|
||
"""
|
||
INSERT INTO cars_card
|
||
(card_id, car_type, owner, phone, associated_parking_space,
|
||
start_datetime, expire_datetime, update_datetime)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
||
""", (card_id, obj.car_type, obj.owner_name, encrypt_number(obj.owner_phone), obj.associated_parking_space,
|
||
obj.start_datetime, obj.expire_datetime, now_datetime_second())
|
||
)
|
||
auth_id = ParkinglotsTable.get_auth_id_by_card_id(table_handler, card_id)
|
||
# cars_auth
|
||
ParkinglotsTable.add_update_cars_auth_record(table_handler, auth_id, obj.car_ids, reg_ids)
|
||
# devices_auth
|
||
st = obj.start_datetime
|
||
et = obj.expire_datetime
|
||
for device_id in device_ids:
|
||
DevicesTable.add_update_device_auth(table_handler, device_id, [auth_id], "车行", st, et)
|
||
|
||
@staticmethod
|
||
def add_car_card_auth(table_handler: BaseTable, obj: AddCarsCard):
|
||
"""注册车辆 & 新建月卡 & 保留记录"""
|
||
# 1. 获取车场系统 device_id
|
||
device_ids = DevicesTable.get_device_ids(table_handler, filter_name="停车")
|
||
if len(device_ids) > 0:
|
||
sc = ServicesCall()
|
||
vri = VehicleRegistrationItem(
|
||
car_plate_no="",
|
||
registration_id="",
|
||
owner_id=obj.owner_phone,
|
||
owner_name=obj.owner_name,
|
||
registration_time=now_tz_datetime(),
|
||
begin_time=obj.start_datetime,
|
||
action="insert",
|
||
registration_type='0' if "业主" in obj.car_type else '1'
|
||
)
|
||
base_reg_id = int(time.time() * (10 ** 7))
|
||
reg_ids = [str(base_reg_id + 1) for _ in obj.car_ids] # 不同设备、相同车牌 相同注册ID
|
||
success_device_ids_0 = []
|
||
for device_index, device_id in enumerate(device_ids):
|
||
# 2. 注册车辆,只要存在失败的设备,就对所有之前成功的设备进行回滚
|
||
for car_id_index, car_id in enumerate(obj.car_ids):
|
||
vri.car_plate_no = car_id
|
||
vri.registration_id = reg_ids[car_id_index]
|
||
_callback, code, msg = sc.vehicle_registration(device_id, vri)
|
||
if not _callback and "UNIQUE" not in msg:
|
||
# 执行回滚
|
||
if len(success_device_ids_0) > 1:
|
||
for success_device_id in success_device_ids_0:
|
||
for j, _id in enumerate(obj.car_ids):
|
||
vri.car_plate_no = _id
|
||
vri.registration_id = reg_ids[j]
|
||
vri.action = "delete"
|
||
sc.vehicle_registration(success_device_id, vri)
|
||
raise InvalidException(f"车辆注册失败,设备:{device_id},错误码:{code},{msg},已执行回滚")
|
||
else:
|
||
if "UNIQUE" in msg:
|
||
vri.action = "update"
|
||
_callback, code, msg = sc.vehicle_registration(device_id, vri)
|
||
success_device_ids_0.append(device_id)
|
||
|
||
# 3. 新建月卡
|
||
multi_car_id = '|'.join(obj.car_ids)
|
||
t = str(int(time.time() * 1000))
|
||
card_id = t[:8] + '-' + t[8:12] + '-' + t[12:] + "000-0000-000000000000" # 不同设备、相同车牌 相同月卡ID
|
||
ufci = UpdateFixedCardItem(
|
||
vehicle_owner_id=obj.owner_phone,
|
||
vehicle_owner_name=obj.owner_name,
|
||
vehicle_owner_phone=obj.owner_phone,
|
||
plate_number=multi_car_id,
|
||
effective_date_begin=obj.start_datetime,
|
||
effective_date_end=obj.expire_datetime,
|
||
action="insert",
|
||
card_id=card_id
|
||
)
|
||
success_device_ids_1 = []
|
||
for device_index, device_id in enumerate(device_ids):
|
||
_callback, code, msg = sc.update_fixed_card(device_id, ufci)
|
||
if not _callback:
|
||
# 月卡新建回滚
|
||
if len(success_device_ids_1) > 1:
|
||
for success_device_id in success_device_ids_1:
|
||
ufci.action = "delete"
|
||
sc.update_fixed_card(success_device_id, ufci)
|
||
# 车辆注册回滚
|
||
if len(success_device_ids_0) > 1:
|
||
for success_device_id in success_device_ids_0:
|
||
for j, _id in enumerate(obj.car_ids):
|
||
vri.car_plate_no = _id
|
||
vri.registration_id = reg_ids[j]
|
||
vri.action = "delete"
|
||
sc.vehicle_registration(success_device_id, vri)
|
||
raise InvalidException(f"车辆新建月卡失败,设备:{device_id},错误码:{code},已执行回滚")
|
||
else:
|
||
success_device_ids_1.append(device_id)
|
||
|
||
# 4. 记录授权 cars_card & cars_auth & devices_auth
|
||
ParkinglotsTable.add_car_card(table_handler, card_id, device_ids, reg_ids, obj)
|
||
return {"status": True}
|
||
|
||
@staticmethod
|
||
def delete_car_auth_id(table_handler: BaseTable, auth_id: int):
|
||
"""移除授权ID下的基本信息及关联车辆授权"""
|
||
# 1. 确认查询授权ID有效
|
||
if ParkinglotsTable.exists_auth_id(table_handler, auth_id):
|
||
# 2. 获取相关授权设备ID & 车辆列表,在设备上进行 月卡删除 & 车辆删除(→_→暂不处理)
|
||
card_id, owner_name, owner_phone, car_type, device_ids, car_ids, _ = \
|
||
ParkinglotsTable.get_auth_info_by_auth_id(table_handler, auth_id)
|
||
start_date, expire_date = ParkinglotsTable.get_car_card_interval(table_handler, card_id)
|
||
if len(device_ids) > 0:
|
||
success_ids = []
|
||
fail_ids = []
|
||
fail_code = []
|
||
fail_msg = []
|
||
sc = ServicesCall()
|
||
update_card_item = UpdateFixedCardItem(
|
||
vehicle_owner_id=owner_phone,
|
||
vehicle_owner_name=owner_name,
|
||
vehicle_owner_phone=owner_phone,
|
||
plate_number="",
|
||
effective_date_begin=start_date,
|
||
effective_date_end=expire_date,
|
||
action="delete",
|
||
card_id=card_id
|
||
)
|
||
for device_id in device_ids:
|
||
update_card_item.plate_number = '|'.join(car_ids)
|
||
# 3. 设备移除授权
|
||
_callback, code, msg = sc.update_fixed_card(device_id, update_card_item)
|
||
if not _callback:
|
||
fail_ids.append(device_id)
|
||
fail_code.append(code)
|
||
fail_msg.append(msg)
|
||
else:
|
||
success_ids.append(device_id)
|
||
|
||
# 4. 删除授权ID cars_card & cars_auth & devices_auth 记录
|
||
if len(success_ids) == len(device_ids): # 全部设备执行成功
|
||
sqls = [
|
||
"DELETE FROM cars_card WHERE auth_id = ?",
|
||
"DELETE FROM cars_auth WHERE auth_id = ?",
|
||
"DELETE FROM devices_auth WHERE _id = ? AND record_type = '车行'",
|
||
]
|
||
params = [(auth_id,), (auth_id,), (auth_id,)]
|
||
table_handler.execute(sqls, params)
|
||
return {"status": True}
|
||
elif len(success_ids) > 0:
|
||
sqls = [
|
||
"DELETE FROM cars_card WHERE auth_id = ?",
|
||
"DELETE FROM cars_auth WHERE auth_id = ?"
|
||
]
|
||
_sql = "DELETE FROM devices_auth WHERE _id = ? AND device_id = ? AND record_type = '车行'"
|
||
sqls += [_sql for _ in range(len(success_ids))]
|
||
params = [(auth_id,), (auth_id,)]
|
||
params += [(auth_id, success_id) for success_id in success_ids]
|
||
table_handler.execute(sqls, params)
|
||
raise InvalidException(f"部分授权移除失败:设备ID {fail_ids}, 错误码: {fail_code}, {fail_msg}")
|
||
else:
|
||
raise InvalidException(f"授权移除失败:设备ID {fail_ids}, 错误码: {fail_code}, {fail_msg}")
|
||
else:
|
||
raise InvalidException(f"授权ID {auth_id} 无效")
|
||
|
||
@staticmethod
|
||
def delete_car_auth(table_handler: BaseTable, delete_car_list):
|
||
"""删除车辆授权的月卡信息,cars_auth"""
|
||
params = []
|
||
for car_id in delete_car_list:
|
||
params.append((car_id,))
|
||
table_handler.executemany("DELETE FROM cars_auth WHERE car_id = ?", params)
|
||
|
||
@staticmethod
|
||
def update_car_card(table_handler: BaseTable, auth_id: int, car_type: str, owner_name: str, owner_phone: str,
|
||
associated_parking_space: str, start_datetime: str, expire_datetime: str):
|
||
"""更新车辆授权的月卡信息,cars_card & cars_auth & devices_auth"""
|
||
table_handler.execute(
|
||
"""
|
||
UPDATE cars_card SET
|
||
car_type = ?, owner = ?, phone = ?, associated_parking_space = ?,
|
||
start_datetime = ?, expire_datetime = ?, update_datetime = ?
|
||
WHERE auth_id = ?
|
||
""", (car_type, owner_name, encrypt_number(owner_phone), associated_parking_space,
|
||
start_datetime, expire_datetime, now_datetime_second(), auth_id)
|
||
)
|
||
|
||
@staticmethod
|
||
def update_car_auth(table_handler: BaseTable, device_ids: list, reg_ids: list, obj: UpdateCarsCard):
|
||
"""更新车辆授权的车辆信息和设备授权日期,cars_auth & devices_auth"""
|
||
# cars_auth
|
||
ParkinglotsTable.add_update_cars_auth_record(table_handler, obj.auth_id, obj.car_ids, reg_ids)
|
||
# devices_auth
|
||
st = obj.start_datetime
|
||
et = obj.expire_datetime
|
||
for device_id in device_ids:
|
||
DevicesTable.add_update_device_auth(table_handler, device_id, [str(obj.auth_id)], "车行", st, et)
|
||
|
||
@staticmethod
|
||
def update_parkinglot_show(table_handler: BaseTable,
|
||
_id: str, name: Optional[str], factory_name: Optional[str], is_show: bool):
|
||
"""更新停车场显示状态"""
|
||
if not ParkinglotsTable.exists_number(table_handler, _id, 'parkinglot'):
|
||
raise InvalidException(f"不存在编码 {_id} ")
|
||
status = 'show' if is_show else None
|
||
sub_sql_list = []
|
||
if name is not None:
|
||
sub_sql_list.append(f"name = '{name}'")
|
||
if factory_name is not None:
|
||
sub_sql_list.append(f"factory_name = '{factory_name}'")
|
||
if len(sub_sql_list) > 0:
|
||
sub_sql = ', '.join(sub_sql_list) + ', '
|
||
else:
|
||
sub_sql = ''
|
||
table_handler.execute(
|
||
f"""
|
||
UPDATE parkinglots
|
||
SET {sub_sql}running_status = ?, update_datetime = ?
|
||
WHERE type = 'parkinglot' AND id = ?
|
||
""", (status, now_datetime_second(), _id)
|
||
)
|
||
return {"status": True}
|
||
|
||
@staticmethod
|
||
def update_car_card_auth(table_handler: BaseTable, obj: UpdateCarsCard):
|
||
"""更新车辆 & 更新月卡 & 保留记录"""
|
||
# 1. 获取车场系统 device_id
|
||
device_ids = DevicesTable.get_device_ids(table_handler, filter_name="停车")
|
||
if len(device_ids) > 0:
|
||
old_st, old_et = ParkinglotsTable.get_auth_id_interval(table_handler, obj.auth_id)
|
||
registration_time = millisecond_timestamp2tz(
|
||
ParkinglotsTable.get_car_reg_id(table_handler, str(obj.car_ids[0]))[:13])
|
||
sc = ServicesCall()
|
||
vri = VehicleRegistrationItem(
|
||
car_plate_no="",
|
||
registration_id="",
|
||
owner_id=obj.owner_phone,
|
||
owner_name=obj.owner_name,
|
||
registration_time=registration_time,
|
||
begin_time=obj.start_datetime,
|
||
action="",
|
||
registration_type='0' if "业主" in obj.car_type else '1'
|
||
)
|
||
# 2. 查询当前授权情况,获取 已授权车辆列表、待授权车辆列表、待移除车辆列表、待更新车辆列表
|
||
card_id, owner_name, owner_phone, car_type, device_ids, authed_car_list, authed_reg_ids = \
|
||
ParkinglotsTable.get_auth_info_by_auth_id(table_handler, obj.auth_id)
|
||
insert_car_list = obj.car_ids.copy()
|
||
delete_car_list = []
|
||
update_car_list = []
|
||
if (obj.start_datetime < now_datetime_second()[:-3] or obj.start_datetime == old_st) and \
|
||
obj.expire_datetime == old_et and obj.owner_phone == owner_phone and \
|
||
obj.owner_name == owner_name:
|
||
# 若有效时间+人名+手机号码未变更则不变的车辆不需更新,只考虑新增的车辆、移除的车辆
|
||
for car_id in obj.car_ids:
|
||
if car_id in authed_car_list:
|
||
insert_car_list.remove(car_id)
|
||
for car_id in authed_car_list:
|
||
if car_id not in obj.car_ids:
|
||
delete_car_list.append(car_id)
|
||
else:
|
||
for car_id in obj.car_ids:
|
||
if car_id in authed_car_list:
|
||
insert_car_list.remove(car_id)
|
||
update_car_list.append(car_id)
|
||
for car_id in authed_car_list:
|
||
if car_id not in obj.car_ids:
|
||
delete_car_list.append(car_id)
|
||
|
||
base_reg_id = int(time.time() * (10 ** 7))
|
||
reg_ids = [str(base_reg_id + 1) for _ in insert_car_list] # 不同设备、相同车牌 相同注册ID
|
||
|
||
success_insert_vri = []
|
||
success_delete_vri = []
|
||
success_update_origin_vri = []
|
||
fail_callback = False
|
||
for device_index, device_id in enumerate(device_ids):
|
||
# 3. 注册车辆 & 删除车辆 & 更新车辆
|
||
for car_id_index, car_id in enumerate(insert_car_list):
|
||
vri.car_plate_no = car_id
|
||
vri.registration_id = reg_ids[car_id_index]
|
||
vri.action = "insert"
|
||
_callback, code, msg = sc.vehicle_registration(device_id, vri)
|
||
if _callback or "UNIQUE" not in msg:
|
||
if not _callback and "UNIQUE" not in msg:
|
||
update_car_list.append(car_id)
|
||
else:
|
||
if "UNIQUE" in msg:
|
||
vri.action = "update"
|
||
_callback, _, _ = sc.vehicle_registration(device_id, vri)
|
||
success_insert_vri.append([device_id, vri.copy()])
|
||
else:
|
||
fail_callback = True
|
||
break
|
||
|
||
for car_id in delete_car_list:
|
||
vri.car_plate_no = car_id
|
||
vri.registration_id = ParkinglotsTable.get_car_reg_id(table_handler, car_id)
|
||
vri.action = "delete"
|
||
_callback, code, msg = sc.vehicle_registration(device_id, vri)
|
||
if _callback:
|
||
success_delete_vri.append([device_id, vri.copy()])
|
||
else:
|
||
fail_callback = True
|
||
break
|
||
|
||
for car_id in update_car_list:
|
||
vri.car_plate_no = car_id
|
||
vri.registration_id = ParkinglotsTable.get_car_reg_id(table_handler, car_id)
|
||
vri.action = "update"
|
||
_callback, code, msg = sc.vehicle_registration(device_id, vri)
|
||
if _callback:
|
||
vri.owner_id = owner_phone
|
||
vri.owner_name = owner_name
|
||
vri.begin_time = old_st
|
||
vri.registration_time = registration_time
|
||
vri.registration_type = car_type
|
||
success_update_origin_vri.append([device_id, vri.copy()])
|
||
else:
|
||
fail_callback = True
|
||
break
|
||
# TODO 回滚逻辑待校验
|
||
if fail_callback: # 只要存在失败的设备,就对所有之前成功的设备进行回滚
|
||
for insert_item in success_insert_vri:
|
||
insert_item[1].action = "delete"
|
||
_callback, _, _ = sc.vehicle_registration(insert_item[0], insert_item[1])
|
||
for delete_item in success_delete_vri:
|
||
delete_item[1].action = "insert"
|
||
_callback, _, _ = sc.vehicle_registration(delete_item[0], delete_item[1])
|
||
for update_item in success_update_origin_vri:
|
||
_callback, _, _ = sc.vehicle_registration(update_item[0], update_item[1])
|
||
|
||
# 3. 更新月卡
|
||
multi_car_id = '|'.join(obj.car_ids)
|
||
card_id = ParkinglotsTable.get_card_id_by_auth_id(table_handler, obj.auth_id)
|
||
ufci = UpdateFixedCardItem(
|
||
vehicle_owner_id=obj.owner_phone,
|
||
vehicle_owner_name=obj.owner_name,
|
||
vehicle_owner_phone=obj.owner_phone,
|
||
plate_number=multi_car_id,
|
||
effective_date_begin=obj.start_datetime,
|
||
effective_date_end=obj.expire_datetime,
|
||
action="update",
|
||
card_id=card_id
|
||
)
|
||
success_device_ids_1 = []
|
||
for device_id in device_ids:
|
||
_callback, code, msg = sc.update_fixed_card(device_id, ufci)
|
||
if not _callback:
|
||
raise InvalidException(f"车辆更新月卡失败,设备:{device_id},错误码:{code},{msg}")
|
||
else:
|
||
success_device_ids_1.append(device_id)
|
||
|
||
# 4. 记录授权 cars_card & cars_auth & devices_auth
|
||
ParkinglotsTable.update_car_card(table_handler, obj.auth_id, obj.car_type,
|
||
obj.owner_name, obj.owner_phone, obj.associated_parking_space,
|
||
obj.start_datetime, obj.expire_datetime)
|
||
if insert_car_list:
|
||
obj.car_ids = insert_car_list
|
||
ParkinglotsTable.update_car_auth(table_handler, device_ids, reg_ids, obj)
|
||
if delete_car_list:
|
||
ParkinglotsTable.delete_car_auth(table_handler, delete_car_list)
|
||
return {"status": True}
|
||
|
||
@staticmethod
|
||
def is_number_added(table_handler: BaseTable, number: str, _type: str):
|
||
"""判断停车场编号是否已添加显示"""
|
||
number_col = "number" if _type != "parkinglot" else "id"
|
||
table_handler.query(
|
||
f"SELECT id FROM parkinglots WHERE {number_col} = ? and type = ? and running_status is not null",
|
||
(number, _type))
|
||
res = table_handler.cursor.fetchall()
|
||
if res:
|
||
return True
|
||
else:
|
||
return False
|
||
|
||
@staticmethod
|
||
def exists(table_handler: BaseTable, _id: str, _type: str):
|
||
table_handler.query(f"SELECT id FROM parkinglots WHERE id = ? and type = ?", (_id, _type))
|
||
res = table_handler.cursor.fetchall()
|
||
if res:
|
||
return True
|
||
else:
|
||
return False
|
||
|
||
@staticmethod
|
||
def exists_number(table_handler: BaseTable, number: str, _type: str):
|
||
number_col = "number" if _type != "parkinglot" else "id"
|
||
table_handler.query(f"SELECT id FROM parkinglots WHERE {number_col} = ? and type = ?",
|
||
(number, _type))
|
||
res = table_handler.cursor.fetchall()
|
||
if res:
|
||
return True
|
||
else:
|
||
return False
|
||
|
||
@staticmethod
|
||
def exists_phone(table_handler: BaseTable, phone: str):
|
||
table_handler.query(f"SELECT auth_id FROM cars_card WHERE phone = ?",
|
||
(encrypt_number(phone),))
|
||
res = table_handler.cursor.fetchall()
|
||
if res:
|
||
return True
|
||
else:
|
||
return False
|
||
|
||
@staticmethod
|
||
def exists_auth_id(table_handler: BaseTable, auth_id: int):
|
||
table_handler.query(f"SELECT auth_id FROM cars_card WHERE auth_id = ?",
|
||
(auth_id,))
|
||
res = table_handler.cursor.fetchall()
|
||
if res:
|
||
return True
|
||
else:
|
||
return False
|