2024-10-24 09:04:38 +08:00

210 lines
8.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding:utf-8 -*-
"""
@File : app
@Author : xuxingchen
@Version : 0.1.6
@Contact : xuxingchen@sinochem.com
@Desc : 将中转mqtt与北向设备端进行连接
"""
import base64
import json
import os.path
import time
import traceback
from urllib import request
import logger
from logger import Logger
from utils import generate_random_string
from devices.meian_db import SQLiteDatabaseEngine, BaseTable, DeviceTable, GatewayTable, UserInfoTable, \
HeartBeatTable, RegisterTable, RecordTable
from devices.common_service import auto_service
from devices.common_model import UserData
from devices.yunfu_model import MeianProduct
from misc import create_mqtt_client
from config import DB_PATH, BROKER_0_HOST, BROKER_0_PORT, BROKER_0_API_PORT, BROKER_0_API_KEY, BROKER_0_API_SECRET, \
BROKER_0_USERNAME, BROKER_0_PASSWD, BROKER_0_SSL, BROKER_1_HOST, BROKER_1_PORT, BROKER_1_SSL
def on_connect(client, userdata, _flags, rc):
Logger.connect(f"{userdata.client_id} mqtt connecting: {{rc: {rc}}}")
if userdata.topics:
_topics = [(topic, 0) for topic in userdata.topics]
client.subscribe(_topics)
Logger.debug(f"{userdata.client_id} subscribe topics: {userdata.topics}")
def on_message(_client, userdata: UserData, message):
try:
msg = json.loads(message.payload.decode().replace("\x00", ""))
if msg.get("dataType", False) and msg.get("dataType") != "heartBeat":
Logger.debug(f"💡 {userdata.client_id} 接收到新数据,执行 on_message 中 ...", log_path=None)
Logger.info(f"{message.topic}: {msg}")
userdata.set_topic(message.topic)
if userdata.table_handler is None:
_db = SQLiteDatabaseEngine(db_path=DB_PATH)
userdata.set_table_handler(BaseTable(_db.connection, _db.cursor))
if userdata.token:
if "messageId" in msg.keys() and userdata.token == msg["messageId"]: # 兼容aiot平台
userdata.set_status_add("status", True)
userdata.set_status_add("response", msg)
if "token" in msg.keys() and userdata.token == msg["token"] and userdata.code == msg["dataType"]: # 兼容美安
userdata.set_status_add("status", True)
userdata.set_status_add("response", msg)
else:
auto_service(msg, userdata)
except Exception as e:
Logger.error(f"{type(e).__name__}, {e}")
if logger.DEBUG:
traceback.print_exc()
def on_disconnect(_client, _userdata, rc):
Logger.disconnect(f"Break mqtt connection! {{rc: {rc}}}")
def gen_gateway_info(th):
# 构建每个网关应当订阅的主题列表
info = {}
# 获取网关信息
gateway_list = GatewayTable.get_registered_gateway(th)
assert gateway_list is not None, "请优先完善网关配置表"
for gateway_item in gateway_list:
sub_aiot_id_list = json.loads(GatewayTable.get_registered_sub_aiot_id(th, gateway_item[0])[0])
# 追加每个网关下每个子设备的主题
if sub_aiot_id_list:
info[gateway_item[0]] = {
"sct": gateway_item[1],
"topics": [
f"/jmlink/{aiot_id}/tml/service/call" for aiot_id in sub_aiot_id_list
]
}
else:
info[gateway_item[0]] = {
"sct": gateway_item[1],
"topics": []
}
return info
def check_ids(ids, host, port, key, secret):
"""检查每个id对应的客户端认证"""
assert ids is not None, "数据库中无设备数据"
id_list = [_id[0] for _id in ids]
url = f"http://{host}:{port}/api/v5/authentication/password_based:built_in_database/users"
headers = {
"Content-Type": "application/json",
"Authorization": "Basic " + base64.b64encode((key + ":" + secret).encode()).decode()
}
for project_code in id_list:
body_data = {
"user_id": f"{project_code}@meian",
"password": MeianProduct.secret,
"is_superuser": False
}
body_json = json.dumps(body_data).encode('utf-8')
req = request.Request(url, data=body_json, headers=headers)
try:
with request.urlopen(req) as f:
response = json.loads(f.read().decode())
status_code = f.getcode()
if status_code == 201:
if "user_id" in response:
Logger.init(f"为项目{project_code}新增用户")
except Exception as e:
if "HTTP Error 409" in str(e): # 用户已存在
pass
else:
Logger.error(f"{type(e).__name__}, {e}")
if logger.DEBUG:
traceback.print_exc()
if __name__ == '__main__':
db = SQLiteDatabaseEngine(db_path=DB_PATH) # 此时数据表中应当已具备数据
table_handler = BaseTable(db.connection, db.cursor)
# 数据库自检初始化
Logger.init("数据库执行初始化校验 ...")
GatewayTable.check(table_handler)
DeviceTable.check(table_handler)
HeartBeatTable.check(table_handler)
RegisterTable.check(table_handler)
UserInfoTable.check(table_handler)
RecordTable.check(table_handler)
Logger.init("数据库完成初始化校验 ✅")
# 设备mqtt认证客户端初始化确保每个设备的账户都存在
Logger.init("中转mqtt认证客户端账户执行初始化校验 ...")
ids = GatewayTable.get_project_code_list(table_handler)
check_ids(ids, BROKER_0_HOST, BROKER_0_API_PORT, BROKER_0_API_KEY, BROKER_0_API_SECRET)
Logger.init("中转mqtt认证客户端账户完成初始化校验 ✅")
# 1. 获取待订阅的主题
topics = json.loads(DeviceTable.get_topics(table_handler)[1]) # 0: subscribe, 1: publish
# 根据现有的设备表信息结合网关表信息构建aiot平台服务调用的主题
gateway_info = gen_gateway_info(table_handler)
client_dict = {}
# 2. 构建客户端
user_data_0 = UserData()
user_data_0.set_topics(topics) # 用于连接时订阅
user_data_0.set_status_add("client", 0)
user_data_0.set_client_id(f"mqtt-python-{generate_random_string()}")
user_data_0.set_clients(client_dict)
client_0 = create_mqtt_client(
broker_host=BROKER_0_HOST,
broker_port=BROKER_0_PORT,
userdata=user_data_0,
on_connect=on_connect,
on_message=on_message,
client_id=user_data_0.client_id,
username=BROKER_0_USERNAME,
password=BROKER_0_PASSWD,
ssl_flag=BROKER_0_SSL
)
client_0.loop_start()
client_dict["center"] = [client_0, user_data_0]
for gateway_id in gateway_info.keys():
user_data_1 = UserData()
user_data_1.set_topics(gateway_info[gateway_id]["topics"]) # 用于连接时订阅
user_data_1.set_status_add("client", 1)
user_data_1.set_status_add("last_timestamp", time.time())
user_data_1.set_status_add("gateway_id", gateway_id)
user_data_1.set_client_id(f"device|1|1|{gateway_id}")
client_1 = create_mqtt_client(
broker_host=BROKER_1_HOST,
broker_port=BROKER_1_PORT,
userdata=user_data_1,
on_connect=on_connect,
on_disconnect=on_disconnect,
on_message=on_message,
client_id=user_data_1.client_id,
username=gateway_id,
password=gateway_info[gateway_id]["sct"],
ssl_flag=BROKER_1_SSL
)
client_1.loop_start()
client_dict[gateway_id] = [client_1, user_data_1]
user_data_1.set_clients(client_dict)
while True:
time.sleep(86400)
t = str(int((time.time() - 86400) * 1000))
UserInfoTable.delete_redundancy_1(table_handler, t)
UserInfoTable.delete_redundancy_0(table_handler, t)
# 每7天清除一次日志文件
creation_time = os.path.getctime(logger.LOGGER_PATH)
days_since_creation = (time.time() - creation_time) / (60 * 60 * 24)
if os.path.exists(logger.LOGGER_PATH) and days_since_creation >= 7:
try:
f0 = open(logger.LOGGER_PATH, "r", encoding="utf8")
f1 = open(f"{logger.LOGGER_PATH}.old", "w", encoding="utf8")
f1.write(f0.read())
f1.close()
f0.close()
os.remove(logger.LOGGER_PATH)
print(f"日志文件 {logger.LOGGER_PATH} 完成重置")
except Exception as e:
print(f"日志文件 {logger.LOGGER_PATH} 重置失败: {e}")