代码拉取完成,页面将自动刷新
from PyQt6.QtWidgets import (QMainWindow, QWidget, QVBoxLayout,
QPushButton, QTextEdit, QHBoxLayout,
QGroupBox, QFileDialog, QLineEdit, QLabel, QMessageBox, QProgressDialog, QProgressBar)
from PyQt6.QtCore import Qt, QThread, pyqtSignal
import requests
import pandas as pd
from datetime import datetime
import time # 添加time模块导入
from PyQt6.QtWidgets import QApplication
# 添加新的工作线程类
class ExportWorkerThread(QThread):
progress_updated = pyqtSignal(int, int) # 当前进度, 总页数
data_chunk_ready = pyqtSignal(list) # 数据块就绪信号
error_occurred = pyqtSignal(str) # 错误信号
finished_with_count = pyqtSignal(int) # 完成信号,带有总数
def __init__(self, api_key, group_id):
super().__init__()
self.api_key = api_key
self.group_id = group_id
self.should_stop = False
def get_total_devices(self):
"""获取分组内设备总数"""
try:
response = requests.get(
"https://iot-api.heclouds.com/devicegroup/detail",
headers={"Authorization": self.api_key},
params={"group_id": self.group_id},
verify=False,
timeout=10
)
if response.status_code == 200:
data = response.json()
if data.get('code') == 0:
return data.get('data', {}).get('device_count', 0)
return 0
except Exception as e:
self.error_occurred.emit(f"获取设备总数失败: {str(e)}")
return 0
def run(self):
try:
# 首先获取设备总数
total_devices = self.get_total_devices()
if total_devices == 0:
self.error_occurred.emit("未能获取到设备总数或分组内无设备")
return
# 计算总页数
limit = 20 # API限制最大为20
total_pages = -(-total_devices // limit) # 向上取整除法
all_devices = []
offset = 0
for current_page in range(1, total_pages + 1):
if self.should_stop:
break
params = {
"group_id": self.group_id,
"limit": limit,
"offset": offset
}
try:
response = requests.get(
"https://iot-api.heclouds.com/devicegroup/devices",
headers={"Authorization": self.api_key},
params=params,
verify=False,
timeout=10
)
if response.status_code != 200:
self.error_occurred.emit(f"API调用失败: {response.status_code}")
break
data = response.json()
if data.get('code') != 0:
self.error_occurred.emit(f"API返回错误: {data.get('msg')}")
break
current_batch = data.get('data', [])
if current_batch:
all_devices.extend(current_batch)
self.progress_updated.emit(current_page, total_pages)
offset += len(current_batch)
time.sleep(0.02) # 20ms延时
except Exception as e:
self.error_occurred.emit(f"请求第 {current_page}/{total_pages} 页时出错: {str(e)}")
break
if all_devices:
self.data_chunk_ready.emit(all_devices)
self.finished_with_count.emit(len(all_devices))
except Exception as e:
self.error_occurred.emit(str(e))
class MainWindow(QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("OneNET分组批量操作工具 v1.3")
self.setMinimumSize(800, 600)
# 创建主窗口部件
main_widget = QWidget()
self.setCentralWidget(main_widget)
# 创建主布局
layout = QVBoxLayout()
main_widget.setLayout(layout)
# 添加说明标签
note_label = QLabel("注意:导入和删除功能使用的Excel模板与OneNET平台分组批量添加设备的模板相同")
note_label.setStyleSheet("color: red;") # 设置文字颜色为红色
layout.addWidget(note_label)
# 创建功能区组
function_group = QGroupBox("功能区")
function_layout = QHBoxLayout()
# 创建功能按钮
self.import_btn = QPushButton("分组批量导入设备")
self.export_btn = QPushButton("分组批量导出设备")
self.delete_btn = QPushButton("分组批量删除设备")
# 添加输入框到功能区
self.group_id_input = QLineEdit()
self.api_key_input = QLineEdit()
self.group_id_label = QLabel("分组ID:")
self.api_key_label = QLabel("API Key:")
# 修改功能区布局
function_layout.addWidget(self.group_id_label)
function_layout.addWidget(self.group_id_input)
function_layout.addWidget(self.api_key_label)
function_layout.addWidget(self.api_key_input)
# 添加按钮到功能区
function_layout.addWidget(self.import_btn)
function_layout.addWidget(self.export_btn)
function_layout.addWidget(self.delete_btn)
function_group.setLayout(function_layout)
# 创建日志区域的标题和控制按钮
log_header = QWidget()
log_header_layout = QHBoxLayout()
log_header_layout.setContentsMargins(0, 0, 0, 0)
log_label = QLabel("操作日志")
self.clear_log_btn = QPushButton("清除日志")
self.clear_log_btn.clicked.connect(self.clear_log)
log_header_layout.addWidget(log_label)
log_header_layout.addStretch() # 添加弹性空间
log_header_layout.addWidget(self.clear_log_btn)
log_header.setLayout(log_header_layout)
# 创建日志显示区
self.log_text = QTextEdit()
self.log_text.setReadOnly(True)
# 将部件添加到主布局
layout.addWidget(function_group)
layout.addWidget(log_header) # 添加日志区域标题和控制按钮
layout.addWidget(self.log_text)
# 连接信号和槽
self.import_btn.clicked.connect(self.import_devices)
self.export_btn.clicked.connect(self.export_devices)
self.delete_btn.clicked.connect(self.delete_devices)
# 添加帮助菜单
menubar = self.menuBar()
help_menu = menubar.addMenu('帮助')
# 添加关于动作
about_action = help_menu.addAction('关于')
about_action.triggered.connect(self.show_about)
def clear_log(self):
"""清除日志内容"""
self.log_text.clear()
def add_separator_to_log(self):
"""添加分割线到日志"""
separator = "-" * 80 # 创建分割线
self.log_text.append(f"\n{separator}\n")
QApplication.processEvents()
def process_devices(self, df, api_key, group_id, operation_type="import"):
"""处理设备的通用方法"""
try:
# 添加操作开始的时间戳和分割线
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.log_text.append(f"\n操作开始时间: {current_time}")
self.add_separator_to_log()
# 验证必要的列是否存在
required_columns = ['产品ID', '设备名称']
if not all(col in df.columns for col in required_columns):
self.log_text.append("错误:Excel文件必须包含以下列:产品ID、设备名称")
return
# 将产品ID转为字符串类型并按产品ID分组
df['产品ID'] = df['产品ID'].astype(str)
product_groups = df.groupby('产品ID')
# 计算总批次数
total_batches = 0
for _, product_df in product_groups:
total_batches += -(-len(product_df) // 100) # 向上取整除法
self.log_text.append(f"检测到 {len(product_groups)} 个不同的产品ID,共 {total_batches} 个批次需要处理")
QApplication.processEvents()
# 确定API端点
api_endpoint = "add-devices" if operation_type == "import" else "del-devices"
api_url = f"https://iot-api.heclouds.com/devicegroup/{api_endpoint}"
# 创建进度对话框
progress_dialog = QProgressDialog("正在处理设备...", "取消", 0, total_batches, self)
progress_dialog.setWindowTitle("处理进度")
progress_dialog.setWindowModality(Qt.WindowModality.WindowModal)
current_batch = 0
total_success = 0
total_devices = len(df)
# 遍历每个产品
for product_id, product_df in product_groups:
self.log_text.append(f"\n开始处理产品ID: {product_id}, 共 {len(product_df)} 个设备")
QApplication.processEvents()
product_success = 0
error_stats = {} # 修改错误统计的数据结构 {error_key: {'count': 0, 'devices': []}}
# 将设备名称列表分成最多100个一组
device_names = product_df['设备名称'].astype(str).tolist()
batches = [device_names[i:i + 100] for i in range(0, len(device_names), 100)]
# 处理每个批次
for batch_index, device_batch in enumerate(batches, 1):
if progress_dialog.wasCanceled():
self.log_text.append("\n操作已取消")
return
current_batch += 1
progress_dialog.setValue(current_batch)
progress_dialog.setLabelText(
f"正在处理产品 {product_id}\n"
f"批次进度: {current_batch}/{total_batches}"
)
request_data = {
"group_id": group_id,
"product_id": product_id,
"device_name_list": device_batch
}
try:
response = requests.post(
api_url,
headers={
"Authorization": api_key,
"Content-Type": "application/json"
},
json=request_data,
verify=False,
timeout=10
)
if response.status_code == 200:
result = response.json()
if result.get('code') == 0:
data = result.get('data', {})
succ_count = len(data.get('succ_devices', []))
fail_devices = data.get('failed_devices', [])
# 更新成功计数
product_success += succ_count
# 统计错误并记录失败设备名称
for device in fail_devices:
error_code = device.get('code')
error_msg = device.get('msg')
device_name = device.get('device_name', '未知设备')
error_key = f"错误码: {error_code}, 原因: {error_msg}"
if error_key not in error_stats:
error_stats[error_key] = {
'count': 0,
'devices': []
}
error_stats[error_key]['count'] += 1
error_stats[error_key]['devices'].append(device_name)
# 输出批次进度
self.log_text.append(
f"批次 {current_batch}/{total_batches}: "
f"成功 {succ_count} 个, "
f"失败 {len(fail_devices)} 个"
)
else:
self.log_text.append(f"批次 {current_batch} 处理失败:{result.get('msg')}")
else:
self.log_text.append(f"批次 {current_batch} API调用失败,状态码:{response.status_code}")
except Exception as e:
self.log_text.append(f"批次 {current_batch} 发生错误:{str(e)}")
QApplication.processEvents()
time.sleep(0.05) # 避免请求过于频繁
# 输出当前产品的错误统计(修改这部分以包含设备名称)
if error_stats:
self.log_text.append(f"\n产品 {product_id} 错误统计:")
for error_key, error_info in error_stats.items():
self.log_text.append(
f"{error_key} - 影响设备数: {error_info['count']}\n"
f"失败设备列表: {', '.join(error_info['devices'])}"
)
QApplication.processEvents()
# 输出当前产品的处理结果
total_success += product_success
self.log_text.append(
f"\n产品 {product_id} 处理完成:"
f"成功 {product_success} 个,"
f"失败 {len(product_df) - product_success} 个"
)
QApplication.processEvents()
# 输出总体结果
self.log_text.append(
f"\n所有产品处理完成!"
f"总成功:{total_success} 个,"
f"总设备数:{total_devices} 个"
)
QApplication.processEvents()
# 在处理完成后添加结束时间戳和分割线
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.log_text.append(f"\n操作结束时间: {current_time}")
self.add_separator_to_log()
except Exception as e:
self.log_text.append(f"处理过程中出错: {str(e)}")
import traceback
self.log_text.append(traceback.format_exc())
# 发生错误时也添加分割线
self.add_separator_to_log()
def import_devices(self):
"""导入设备到分组"""
file_name, _ = QFileDialog.getOpenFileName(
self,
"选择导入文件",
"",
"Excel Files (*.xlsx *.xls);;All Files (*)"
)
if not file_name:
return
try:
# 获取必要参数
api_key = self.api_key_input.text().strip()
group_id = self.group_id_input.text().strip()
# 验证参数
if not all([api_key, group_id]):
self.log_text.append("错误:请输入API Key和分组ID")
self.add_separator_to_log()
return
# 读取Excel文件
self.log_text.append(f"正在读取文件:{file_name}")
df = pd.read_excel(file_name)
# 处理设备
QApplication.setOverrideCursor(Qt.CursorShape.WaitCursor)
try:
self.process_devices(df, api_key, group_id, "import")
finally:
QApplication.restoreOverrideCursor()
except Exception as e:
self.log_text.append(f"导入过程中出错: {str(e)}")
import traceback
self.log_text.append(traceback.format_exc())
self.add_separator_to_log()
def export_devices(self):
"""导出分组内的设备"""
group_id = self.group_id_input.text().strip()
api_key = self.api_key_input.text().strip()
if not all([group_id, api_key]):
self.log_text.append("错误:请输入分组ID和API Key")
self.add_separator_to_log()
return
file_name, _ = QFileDialog.getSaveFileName(
self,
"选择保存位置",
f"设备导出_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx",
"Excel Files (*.xlsx);;All Files (*)"
)
if not file_name:
return
# 创建进度对话框
self.progress_dialog = QProgressDialog("正在获取设备总数...", "取消", 0, 100, self)
self.progress_dialog.setWindowTitle("导出进度")
self.progress_dialog.setWindowModality(Qt.WindowModality.WindowModal)
self.progress_dialog.setAutoClose(True)
self.progress_dialog.setAutoReset(True)
# 添加操作开始的时间戳和分割线
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.log_text.append(f"\n导出开始时间: {current_time}")
self.add_separator_to_log()
# 创建并配置工作线程
self.export_thread = ExportWorkerThread(api_key, group_id)
self.export_thread.progress_updated.connect(self.update_export_progress)
self.export_thread.data_chunk_ready.connect(lambda data: self.save_to_excel(data, file_name))
self.export_thread.error_occurred.connect(self.handle_export_error)
self.export_thread.finished_with_count.connect(self.handle_export_finished)
# 连接取消信号
self.progress_dialog.canceled.connect(self.export_thread.terminate)
# 启动线程
self.export_thread.start()
def update_export_progress(self, current_page, total_pages):
"""更新导出进度显示"""
progress = int((current_page / total_pages) * 100)
self.progress_dialog.setValue(progress)
self.progress_dialog.setLabelText(
f"正在导出设备数据...\n"
f"当前进度: {progress}%\n"
f"页码: {current_page}/{total_pages}"
)
self.log_text.append(f"已完成第 {current_page}/{total_pages} 页数据获取")
QApplication.processEvents()
def handle_export_error(self, error_msg):
"""处理导出错误"""
self.log_text.append(f"导出错误: {error_msg}")
self.add_separator_to_log()
QApplication.processEvents()
def handle_export_finished(self, count):
"""处理导出完成"""
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.log_text.append(f"导出完成!共导出 {count} 条设备数据")
self.log_text.append(f"导出结束时间: {current_time}")
self.add_separator_to_log()
QApplication.processEvents()
def save_to_excel(self, devices_data, file_name):
try:
df = pd.DataFrame(devices_data)
expected_columns = [
'id', 'name', 'status', 'create_time',
'activate_time', 'last_connect_time', 'imei',
'group_id', 'enable_status', 'private',
'obev', 'access_id', 'data_pt', 'sec_key'
]
available_columns = [col for col in expected_columns if col in df.columns]
df = df[available_columns]
with pd.ExcelWriter(file_name, engine='openpyxl') as writer:
df.to_excel(writer, index=False)
worksheet = writer.sheets['Sheet1']
for idx, col in enumerate(df.columns):
max_length = max(
df[col].astype(str).apply(len).max(),
len(col)
)
worksheet.column_dimensions[chr(65 + idx)].width = max_length + 2
except Exception as e:
self.log_text.append(f"保存Excel文件时出错: {str(e)}")
def delete_devices(self):
"""从分组中删除设备"""
file_name, _ = QFileDialog.getOpenFileName(
self,
"选择要删除的设备文件",
"",
"Excel Files (*.xlsx *.xls);;All Files (*)"
)
if not file_name:
return
try:
# 获取必要参数
api_key = self.api_key_input.text().strip()
group_id = self.group_id_input.text().strip()
# 验证参数
if not all([api_key, group_id]):
self.log_text.append("错误:请输入API Key和分组ID")
self.add_separator_to_log()
return
# 读取Excel文件
self.log_text.append(f"正在读取文件:{file_name}")
df = pd.read_excel(file_name)
# 处理设备
QApplication.setOverrideCursor(Qt.CursorShape.WaitCursor)
try:
self.process_devices(df, api_key, group_id, "delete")
finally:
QApplication.restoreOverrideCursor()
except Exception as e:
self.log_text.append(f"删除过程中出错: {str(e)}")
import traceback
self.log_text.append(traceback.format_exc())
self.add_separator_to_log()
def show_about(self):
QMessageBox.about(
self,
"关于",
"OneNET分组批量操作工具\n\n"
"用于批量管理OneNET设备分组的工具\n"
"支持分组批量导入、导出和删除操作\n\n"
"该批量工具仅作开发测试\n\n"
"若程序运行中出现中断或其他未响应情况,请关闭程序后重试,或自行处理异常\n\n"
)
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。