# -*- coding:utf-8 -*- """ @File : utils.py @Author : xuxingchen @Version : 1.0 @Contact : xuxingchen@sinochem.com @Desc : None """ import hashlib from datetime import datetime import hmac import base64 import re import ssl import string import time import uuid import random import socket def extract_number(text: str) -> str: """从描述文字中提取数据信息""" number_match = re.search(r'(\d+)', text) number = number_match.group(1) if number_match else "" return number def extract_building_unit(text: str) -> tuple[str, str]: """从空间描述文字中提取从楼栋和单元信息""" # 提取楼栋数字 building_match = re.search(r'(\d+)号楼', text) building_number = building_match.group(1).zfill(2) if building_match else "" # 提取单元数字 unit_match = re.search(r'(\d+)单元', text) unit_number = unit_match.group(1).zfill(2) if unit_match else "" return building_number, unit_number def get_sign(data, key): hmac_md5 = hmac.new(key.encode('utf-8'), data.encode("utf8"), "MD5") return hmac_md5.hexdigest() def encode_to_base64(data: str) -> str: """将 字符串 编码为 base64 字符串""" encoded_data = base64.b64encode(data.encode("utf8")) return encoded_data.decode("utf8") def to_snake_case(name: str) -> str: """将驼峰命名转换为蛇命名""" s1 = re.sub('([a-z])([A-Z])', r'\1_\2', name) return re.sub('([A-Z0-9]+)([A-Z])', r'\1_\2', s1).lower() def to_obj(data: dict, obj_class: type): """将字典数据转换为dataclass对象""" obj = obj_class() for key, value in data.items(): attr_name = to_snake_case(key) if hasattr(obj, attr_name): setattr(obj, attr_name, value) obj.check() return obj def generate_token(): token_part = str(uuid.uuid4()).replace('-', '') timestamp_part = str(int(time.time())) token = token_part[:16] + timestamp_part + token_part[16:] return token def generate_time_token(data: str): timestamp = str(int(time.time())) data = timestamp + data # 对连接后的字符串进行哈希 token = hashlib.sha256(data.encode()).hexdigest() return token[:32] def generate_random_string(length: int = 6): characters = string.ascii_letters + string.digits random_string = ''.join(random.choice(characters) for _ in range(length)) return random_string def get_ssl_certificate(domain: str, port: int): # 建立一个加密连接 context = ssl.create_default_context() with socket.create_connection((domain, port)) as sock: with context.wrap_socket(sock, server_hostname=domain) as ssock: # 获取证书 cert = ssl.DER_cert_to_PEM_cert(ssock.getpeercert(True)) return cert def save_certificate_to_file(cert, filename): with open(filename, 'w') as file: file.write(cert) def datetime_to_timestamp(datetime_str): # 将日期时间字符串解析为 datetime 对象 dt = datetime.strptime(datetime_str, '%Y-%m-%d %H:%M:%S') # 将 datetime 对象转换为指定格式的字符串 formatted_dt = dt.strftime('%Y-%m-%dT%H:%M:%S.000Z') return formatted_dt def encrypt_string(data: str, key: str): # 待加密数据和加密密钥转为字节 data_bytes = data.encode('utf-8') key_bytes = key.encode('utf-8') # 使用key_bytes对data_bytes每个字节按位异或 encrypted_data = bytearray() for i in range(len(data_bytes)): encrypted_data.append(data_bytes[i] ^ key_bytes[i % len(key_bytes)]) # 使用Base64编码以便于存储和传输 return base64.b64encode(encrypted_data).decode('utf-8') def decrypt_string(data: str, key: str): # 使用Base64解码获取原始加密数据 encrypted_data = base64.b64decode(data) key_bytes = key.encode('utf-8') # 使用key_bytes对encrypted_data每个字节按位异或 解密 decrypted_data = bytearray() for i in range(len(encrypted_data)): decrypted_data.append(encrypted_data[i] ^ key_bytes[i % len(key_bytes)]) return decrypted_data.decode('utf-8') if __name__ == '__main__': # 定义密钥和要计算哈希的消息 # pid = "4bb9a56bf*********e99bac6d" # sub # psc = "9b0159706*********0ea665ddd" # sub pid = "ef8dba7*********42938e04" psc = "b0b6114771***********b861c58bf7c" deviceName = "SKHZJMF" message = f'deviceName{deviceName}productId{pid}' print(get_sign(message, psc)) print(time.time() * 1000) encrypt_number = encrypt_string("18136222222", "meian") print(encrypt_number) print(decrypt_string(encrypt_number, "meian")) from devices.meian_model import Register register = to_obj({ "dataType": "register", "deviceId": "ma17qt7k", "devicePositionCode": "YFCS", "devicePositionDesc": "\u4e91\u670d\u6d4b\u8bd5", "deviceType": 1, "factoryId": "MeianTech", "token": "1" }, Register) print(encode_to_base64("hello")) print(register) print(register.__dict__) print("\u4e91\u670d\u6d4b\u8bd5") print(generate_token()) print(generate_time_token("langzihan")) print(datetime_to_timestamp("2024-04-08 15:19:55")) # cert = get_ssl_certificate("iot-broker-test.*********.com", 38883) # save_certificate_to_file(cert, "*********.crt")