210 lines
8.4 KiB
Python
210 lines
8.4 KiB
Python
# -*- coding:utf-8 -*-
|
||
"""
|
||
@File : app
|
||
@Author : xuxingchen
|
||
@Version : 0.1.6
|
||
@Contact : xuxingchen@sinochem.com
|
||
@Desc : 将中转mqtt与北向设备端进行连接
|
||
"""
|
||
import base64
|
||
import json
|
||
import os.path
|
||
import time
|
||
import traceback
|
||
from urllib import request
|
||
|
||
import logger
|
||
from logger import Logger
|
||
from utils import generate_random_string
|
||
from devices.meian_db import SQLiteDatabaseEngine, BaseTable, DeviceTable, GatewayTable, UserInfoTable, \
|
||
HeartBeatTable, RegisterTable, RecordTable
|
||
from devices.common_service import auto_service
|
||
from devices.common_model import UserData
|
||
from devices.yunfu_model import MeianProduct
|
||
from misc import create_mqtt_client
|
||
from config import DB_PATH, BROKER_0_HOST, BROKER_0_PORT, BROKER_0_API_PORT, BROKER_0_API_KEY, BROKER_0_API_SECRET, \
|
||
BROKER_0_USERNAME, BROKER_0_PASSWD, BROKER_0_SSL, BROKER_1_HOST, BROKER_1_PORT, BROKER_1_SSL
|
||
|
||
|
||
def on_connect(client, userdata, _flags, rc):
|
||
Logger.connect(f"{userdata.client_id} mqtt connecting: {{rc: {rc}}}")
|
||
if userdata.topics:
|
||
_topics = [(topic, 0) for topic in userdata.topics]
|
||
client.subscribe(_topics)
|
||
Logger.debug(f"{userdata.client_id} subscribe topics: {userdata.topics}")
|
||
|
||
|
||
def on_message(_client, userdata: UserData, message):
|
||
try:
|
||
msg = json.loads(message.payload.decode().replace("\x00", ""))
|
||
if msg.get("dataType", False) and msg.get("dataType") != "heartBeat":
|
||
Logger.debug(f"💡 {userdata.client_id} 接收到新数据,执行 on_message 中 ...", log_path=None)
|
||
Logger.info(f"{message.topic}: {msg}")
|
||
userdata.set_topic(message.topic)
|
||
if userdata.table_handler is None:
|
||
_db = SQLiteDatabaseEngine(db_path=DB_PATH)
|
||
userdata.set_table_handler(BaseTable(_db.connection, _db.cursor))
|
||
if userdata.token:
|
||
if "messageId" in msg.keys() and userdata.token == msg["messageId"]: # 兼容aiot平台
|
||
userdata.set_status_add("status", True)
|
||
userdata.set_status_add("response", msg)
|
||
if "token" in msg.keys() and userdata.token == msg["token"] and userdata.code == msg["dataType"]: # 兼容美安
|
||
userdata.set_status_add("status", True)
|
||
userdata.set_status_add("response", msg)
|
||
else:
|
||
auto_service(msg, userdata)
|
||
except Exception as e:
|
||
Logger.error(f"{type(e).__name__}, {e}")
|
||
if logger.DEBUG:
|
||
traceback.print_exc()
|
||
|
||
|
||
def on_disconnect(_client, _userdata, rc):
|
||
Logger.disconnect(f"Break mqtt connection! {{rc: {rc}}}")
|
||
|
||
|
||
def gen_gateway_info(th):
|
||
# 构建每个网关应当订阅的主题列表
|
||
info = {}
|
||
# 获取网关信息
|
||
gateway_list = GatewayTable.get_registered_gateway(th)
|
||
assert gateway_list is not None, "请优先完善网关配置表"
|
||
for gateway_item in gateway_list:
|
||
sub_aiot_id_list = json.loads(GatewayTable.get_registered_sub_aiot_id(th, gateway_item[0])[0])
|
||
# 追加每个网关下每个子设备的主题
|
||
if sub_aiot_id_list:
|
||
info[gateway_item[0]] = {
|
||
"sct": gateway_item[1],
|
||
"topics": [
|
||
f"/jmlink/{aiot_id}/tml/service/call" for aiot_id in sub_aiot_id_list
|
||
]
|
||
}
|
||
else:
|
||
info[gateway_item[0]] = {
|
||
"sct": gateway_item[1],
|
||
"topics": []
|
||
}
|
||
return info
|
||
|
||
|
||
def check_ids(ids, host, port, key, secret):
|
||
"""检查每个id对应的客户端认证"""
|
||
assert ids is not None, "数据库中无设备数据"
|
||
id_list = [_id[0] for _id in ids]
|
||
url = f"http://{host}:{port}/api/v5/authentication/password_based:built_in_database/users"
|
||
headers = {
|
||
"Content-Type": "application/json",
|
||
"Authorization": "Basic " + base64.b64encode((key + ":" + secret).encode()).decode()
|
||
}
|
||
for project_code in id_list:
|
||
body_data = {
|
||
"user_id": f"{project_code}@meian",
|
||
"password": MeianProduct.secret,
|
||
"is_superuser": False
|
||
}
|
||
body_json = json.dumps(body_data).encode('utf-8')
|
||
req = request.Request(url, data=body_json, headers=headers)
|
||
try:
|
||
with request.urlopen(req) as f:
|
||
response = json.loads(f.read().decode())
|
||
status_code = f.getcode()
|
||
if status_code == 201:
|
||
if "user_id" in response:
|
||
Logger.init(f"为项目{project_code}新增用户")
|
||
except Exception as e:
|
||
if "HTTP Error 409" in str(e): # 用户已存在
|
||
pass
|
||
else:
|
||
Logger.error(f"{type(e).__name__}, {e}")
|
||
if logger.DEBUG:
|
||
traceback.print_exc()
|
||
|
||
|
||
if __name__ == '__main__':
|
||
db = SQLiteDatabaseEngine(db_path=DB_PATH) # 此时数据表中应当已具备数据
|
||
table_handler = BaseTable(db.connection, db.cursor)
|
||
# 数据库自检初始化
|
||
Logger.init("数据库执行初始化校验 ...")
|
||
GatewayTable.check(table_handler)
|
||
DeviceTable.check(table_handler)
|
||
HeartBeatTable.check(table_handler)
|
||
RegisterTable.check(table_handler)
|
||
UserInfoTable.check(table_handler)
|
||
RecordTable.check(table_handler)
|
||
Logger.init("数据库完成初始化校验 ✅")
|
||
|
||
# 设备mqtt认证客户端初始化,确保每个设备的账户都存在
|
||
Logger.init("中转mqtt认证客户端账户执行初始化校验 ...")
|
||
ids = GatewayTable.get_project_code_list(table_handler)
|
||
check_ids(ids, BROKER_0_HOST, BROKER_0_API_PORT, BROKER_0_API_KEY, BROKER_0_API_SECRET)
|
||
Logger.init("中转mqtt认证客户端账户完成初始化校验 ✅")
|
||
|
||
# 1. 获取待订阅的主题
|
||
topics = json.loads(DeviceTable.get_topics(table_handler)[1]) # 0: subscribe, 1: publish
|
||
# 根据现有的设备表信息结合网关表信息构建aiot平台服务调用的主题
|
||
gateway_info = gen_gateway_info(table_handler)
|
||
|
||
client_dict = {}
|
||
# 2. 构建客户端
|
||
user_data_0 = UserData()
|
||
user_data_0.set_topics(topics) # 用于连接时订阅
|
||
user_data_0.set_status_add("client", 0)
|
||
user_data_0.set_client_id(f"mqtt-python-{generate_random_string()}")
|
||
user_data_0.set_clients(client_dict)
|
||
client_0 = create_mqtt_client(
|
||
broker_host=BROKER_0_HOST,
|
||
broker_port=BROKER_0_PORT,
|
||
userdata=user_data_0,
|
||
on_connect=on_connect,
|
||
on_message=on_message,
|
||
client_id=user_data_0.client_id,
|
||
username=BROKER_0_USERNAME,
|
||
password=BROKER_0_PASSWD,
|
||
ssl_flag=BROKER_0_SSL
|
||
)
|
||
client_0.loop_start()
|
||
client_dict["center"] = [client_0, user_data_0]
|
||
|
||
for gateway_id in gateway_info.keys():
|
||
user_data_1 = UserData()
|
||
user_data_1.set_topics(gateway_info[gateway_id]["topics"]) # 用于连接时订阅
|
||
user_data_1.set_status_add("client", 1)
|
||
user_data_1.set_status_add("last_timestamp", time.time())
|
||
user_data_1.set_status_add("gateway_id", gateway_id)
|
||
user_data_1.set_client_id(f"device|1|1|{gateway_id}")
|
||
client_1 = create_mqtt_client(
|
||
broker_host=BROKER_1_HOST,
|
||
broker_port=BROKER_1_PORT,
|
||
userdata=user_data_1,
|
||
on_connect=on_connect,
|
||
on_disconnect=on_disconnect,
|
||
on_message=on_message,
|
||
client_id=user_data_1.client_id,
|
||
username=gateway_id,
|
||
password=gateway_info[gateway_id]["sct"],
|
||
ssl_flag=BROKER_1_SSL
|
||
)
|
||
client_1.loop_start()
|
||
client_dict[gateway_id] = [client_1, user_data_1]
|
||
user_data_1.set_clients(client_dict)
|
||
|
||
while True:
|
||
time.sleep(86400)
|
||
t = str(int((time.time() - 86400) * 1000))
|
||
UserInfoTable.delete_redundancy_1(table_handler, t)
|
||
UserInfoTable.delete_redundancy_0(table_handler, t)
|
||
# 每7天清除一次日志文件
|
||
creation_time = os.path.getctime(logger.LOGGER_PATH)
|
||
days_since_creation = (time.time() - creation_time) / (60 * 60 * 24)
|
||
if os.path.exists(logger.LOGGER_PATH) and days_since_creation >= 7:
|
||
try:
|
||
f0 = open(logger.LOGGER_PATH, "r", encoding="utf8")
|
||
f1 = open(f"{logger.LOGGER_PATH}.old", "w", encoding="utf8")
|
||
f1.write(f0.read())
|
||
f1.close()
|
||
f0.close()
|
||
os.remove(logger.LOGGER_PATH)
|
||
print(f"日志文件 {logger.LOGGER_PATH} 完成重置")
|
||
except Exception as e:
|
||
print(f"日志文件 {logger.LOGGER_PATH} 重置失败: {e}")
|