2024-10-24 09:04:38 +08:00

176 lines
5.3 KiB
Python

# -*- coding:utf-8 -*-
"""
@File : utils.py
@Author : xuxingchen
@Version : 1.0
@Contact : xuxingchen@sinochem.com
@Desc : None
"""
import hashlib
from datetime import datetime
import hmac
import base64
import re
import ssl
import string
import time
import uuid
import random
import socket
def extract_number(text: str) -> str:
"""从描述文字中提取数据信息"""
number_match = re.search(r'(\d+)', text)
number = number_match.group(1) if number_match else ""
return number
def extract_building_unit(text: str) -> tuple[str, str]:
"""从空间描述文字中提取从楼栋和单元信息"""
# 提取楼栋数字
building_match = re.search(r'(\d+)号楼', text)
building_number = building_match.group(1).zfill(2) if building_match else ""
# 提取单元数字
unit_match = re.search(r'(\d+)单元', text)
unit_number = unit_match.group(1).zfill(2) if unit_match else ""
return building_number, unit_number
def get_sign(data, key):
hmac_md5 = hmac.new(key.encode('utf-8'), data.encode("utf8"), "MD5")
return hmac_md5.hexdigest()
def encode_to_base64(data: str) -> str:
"""将 字符串 编码为 base64 字符串"""
encoded_data = base64.b64encode(data.encode("utf8"))
return encoded_data.decode("utf8")
def to_snake_case(name: str) -> str:
"""将驼峰命名转换为蛇命名"""
s1 = re.sub('([a-z])([A-Z])', r'\1_\2', name)
return re.sub('([A-Z0-9]+)([A-Z])', r'\1_\2', s1).lower()
def to_obj(data: dict, obj_class: type):
"""将字典数据转换为dataclass对象"""
obj = obj_class()
for key, value in data.items():
attr_name = to_snake_case(key)
if hasattr(obj, attr_name):
setattr(obj, attr_name, value)
obj.check()
return obj
def generate_token():
token_part = str(uuid.uuid4()).replace('-', '')
timestamp_part = str(int(time.time()))
token = token_part[:16] + timestamp_part + token_part[16:]
return token
def generate_time_token(data: str):
timestamp = str(int(time.time()))
data = timestamp + data
# 对连接后的字符串进行哈希
token = hashlib.sha256(data.encode()).hexdigest()
return token[:32]
def generate_random_string(length: int = 6):
characters = string.ascii_letters + string.digits
random_string = ''.join(random.choice(characters) for _ in range(length))
return random_string
def get_ssl_certificate(domain: str, port: int):
# 建立一个加密连接
context = ssl.create_default_context()
with socket.create_connection((domain, port)) as sock:
with context.wrap_socket(sock, server_hostname=domain) as ssock:
# 获取证书
cert = ssl.DER_cert_to_PEM_cert(ssock.getpeercert(True))
return cert
def save_certificate_to_file(cert, filename):
with open(filename, 'w') as file:
file.write(cert)
def datetime_to_timestamp(datetime_str):
# 将日期时间字符串解析为 datetime 对象
dt = datetime.strptime(datetime_str, '%Y-%m-%d %H:%M:%S')
# 将 datetime 对象转换为指定格式的字符串
formatted_dt = dt.strftime('%Y-%m-%dT%H:%M:%S.000Z')
return formatted_dt
def encrypt_string(data: str, key: str):
# 待加密数据和加密密钥转为字节
data_bytes = data.encode('utf-8')
key_bytes = key.encode('utf-8')
# 使用key_bytes对data_bytes每个字节按位异或
encrypted_data = bytearray()
for i in range(len(data_bytes)):
encrypted_data.append(data_bytes[i] ^ key_bytes[i % len(key_bytes)])
# 使用Base64编码以便于存储和传输
return base64.b64encode(encrypted_data).decode('utf-8')
def decrypt_string(data: str, key: str):
# 使用Base64解码获取原始加密数据
encrypted_data = base64.b64decode(data)
key_bytes = key.encode('utf-8')
# 使用key_bytes对encrypted_data每个字节按位异或 解密
decrypted_data = bytearray()
for i in range(len(encrypted_data)):
decrypted_data.append(encrypted_data[i] ^ key_bytes[i % len(key_bytes)])
return decrypted_data.decode('utf-8')
if __name__ == '__main__':
# 定义密钥和要计算哈希的消息
# pid = "4bb9a56bf*********e99bac6d" # sub
# psc = "9b0159706*********0ea665ddd" # sub
pid = "ef8dba7*********42938e04"
psc = "b0b6114771***********b861c58bf7c"
deviceName = "SKHZJMF"
message = f'deviceName{deviceName}productId{pid}'
print(get_sign(message, psc))
print(time.time() * 1000)
encrypt_number = encrypt_string("18136222222", "meian")
print(encrypt_number)
print(decrypt_string(encrypt_number, "meian"))
from devices.meian_model import Register
register = to_obj({
"dataType": "register",
"deviceId": "ma17qt7k",
"devicePositionCode": "YFCS",
"devicePositionDesc": "\u4e91\u670d\u6d4b\u8bd5",
"deviceType": 1,
"factoryId": "MeianTech",
"token": "1"
}, Register)
print(encode_to_base64("hello"))
print(register)
print(register.__dict__)
print("\u4e91\u670d\u6d4b\u8bd5")
print(generate_token())
print(generate_time_token("langzihan"))
print(datetime_to_timestamp("2024-04-08 15:19:55"))
# cert = get_ssl_certificate("iot-broker-test.*********.com", 38883)
# save_certificate_to_file(cert, "*********.crt")