2024-10-24 09:23:39 +08:00

114 lines
4.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding:utf-8 -*-
"""
@Author : xuxingchen
@Contact : xuxingchen@sinochem.com
@Desc : 内置的产品信息表在初始化时会初始化csv进行覆盖
"""
import csv
import os
from datetime import datetime
from pydantic import BaseModel
from utils import logger
from utils.database import BaseTable
from utils.misc import now_datetime_second
class Product(BaseModel):
product_id: str
product_name: str
node_type: str
class ProductsTable(BaseTable):
@staticmethod
def check(table_handler: BaseTable):
"""检测是否存在当前表"""
init_config_path = os.path.join(os.path.dirname(os.path.abspath("__file__")), "data/InitialData/products.csv")
if os.path.exists(init_config_path):
table_handler.execute("DROP TABLE IF EXISTS products;")
table_handler.execute(
f"""
CREATE TABLE products (
product_id TEXT,
product_name TEXT,
node_type TEXT,
update_datetime TEXT,
PRIMARY KEY (product_id)
)
"""
)
if os.path.exists(init_config_path):
with open(init_config_path, newline='', encoding='utf8') as csvfile:
csvreader = csv.reader(csvfile)
head = next(csvreader)
data = []
if len(head) == 3:
for row in csvreader:
product_id = row[0].strip()
product_name = row[1].strip()
node_type = row[2].strip()
update_datetime = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
data.append((product_id, product_name, node_type, update_datetime))
table_handler.executemany(
f"""
INSERT INTO products
(product_id, product_name, node_type, update_datetime)
VALUES (?, ?, ?, ?)
""",
data
)
table_handler.query("SELECT name FROM sqlite_master WHERE type='table' AND name='brands'")
if table_handler.cursor.fetchone() is None:
table_handler.execute(
f"""
CREATE TABLE brands (
brand_id INT,
brand_name TEXT,
status TEXT,
add_datetime TEXT,
update_datetime TEXT,
PRIMARY KEY (brand_id)
)
"""
)
init_config_path = os.path.join(os.path.dirname(os.path.abspath("__file__")), "data/InitialData/brands.csv")
if os.path.exists(init_config_path):
with open(init_config_path, newline='', encoding='utf8') as csvfile:
csvreader = csv.reader(csvfile)
head = next(csvreader)
data = []
if len(head) == 3:
for row in csvreader:
brand_id = row[0].strip()
brand_name = row[1].strip()
status = row[2].strip()
data.append((brand_id, brand_name, status, now_datetime_second(), now_datetime_second()))
table_handler.executemany(
f"""
INSERT INTO brands
(brand_id, brand_name, status, add_datetime, update_datetime)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT (brand_id) DO NOTHING
""",
data
)
@staticmethod
def get_product_info(table_handler: BaseTable, product_id: str):
table_handler.query(
"""
SELECT product_name, node_type
FROM products
WHERE product_id = ?
""",
(product_id,)
)
res = table_handler.cursor.fetchall()
if res:
return Product(product_id=product_id, product_name=res[0][0], node_type=res[0][1])
else:
return None