工作

使用pyqt开发一个工作效率工具TBox_Log_Tool

Posted on 2023-04-11,13 min read
封面图

pyqt在实际工作中的应用

以往手工测试时的低效点

  • 每执行一条用例都需要打开三个CMD窗口分别记录 adb log、service log和监看OCPP log,每次执行完一条用例都需要重新在CMD窗口输入adb命令,非常繁琐
  • 每执行一条用例都需要记录以太网报文,并且在wireshark中点击另存以太网报文
  • 测试设备在1L车间中,冬冷夏热,测试时需要频繁进出空调间去操作设备的HMI,插拔设备的SIM卡

通过该工具,改进了以上的三个点

  • 只需要简单的点击就可以同时记录3种 log
  • 支持对开发板射频模块的打开和关闭
  • 集成了开源项目scrcpy,使测试人员在电脑上实现对充电桩操作界面的远程控制

待优化的功能(公司倒闭,不用优化了🤡)

  • 使用python多线程对adb设备的意外断开做处理

工具效果展示与功能讲解

  • 第一行控制测试设备(TBox)的连接状态,如连接了多个adb设备,可在下拉列表中选择TBox设备
  • 第二行是用serial库来对TBox发送指定的AT命令来启用/禁用射频模块,达到使TBox在线/离线的效果
  • 第二行的“连接HMI”使用scrcpy工具来对HMI进行投屏操作,方便测试人员在电脑上操作和观察HMI
  • 第三行日志名中一般填入用例编号,如果用例多次执行,程序会自动在新的log文件名中加入“_1”、“_2”以此类推
  • 第四行使用Tshark列出电脑上所有的网卡,勾选“录制以太网报文”后再点击“开始录制”会同步记录wireshark报文
  • 第四行的“弹出ocpplog”点击后会弹出ocpplog的实时监看窗口
  • 使用pyinstaller对程序进行打包后即可生成exe可执行文件

具体实现

import sys
import time
import os
import subprocess
import serial as ser
import configparser
import datetime
from PyQt5.QtWidgets import QApplication, QWidget, QLabel, QPushButton, QLineEdit, QVBoxLayout, QHBoxLayout, QComboBox, QMessageBox,QCheckBox

class LogRecorder(QWidget):
    def __init__(self):
        super().__init__()
        self.devices = []
        self.device = None
        self.case_name = None
        self.logcat_process = None
        self.ocpplog_process = None
        self.init_ui()

    def init_ui(self):
        self.setWindowTitle("TBox Log Tool")
        # 创建控件
        device_label = QLabel("ADB设备:")
        self.device_combo = QComboBox()
        self.refresh_button = QPushButton("刷新")
        self.connect_button = QPushButton("连接")
        self.disconnect_button = QPushButton("断开")
        self.disconnect_button.setEnabled(False)
        case_label = QLabel("日志名:")
        self.case_edit = QLineEdit()
        self.case_edit.setEnabled(False)
        self.record_button = QPushButton("开始录制")
        self.record_button.setEnabled(False)
        self.stop_button = QPushButton("停止录制")
        self.stop_button.setEnabled(False)
        atport_label = QLabel("AT COM口:")
        self.atport_edit = QLineEdit()
        self.offline_button = QPushButton("关闭射频模块")
        self.online_button = QPushButton("打开射频模块")
        self.online_button.setEnabled(False)
        self.hmi_button = QPushButton("连接HMI")
        net_label = QLabel("选择网口:")
        self.net_device_combo = QComboBox()
        self.net_capture_enable = QCheckBox("录制以太网报文")
        self.ocpplog_button = QPushButton("弹出ocpplog")
        self.ocpplog_button.setEnabled(False)



        # 布局控件
        device_layout = QHBoxLayout()
        device_layout.addWidget(device_label)
        device_layout.addWidget(self.device_combo)
        device_layout.addWidget(self.refresh_button)
        device_layout.addWidget(self.connect_button)
        device_layout.addWidget(self.disconnect_button)
        offline_layout = QHBoxLayout()
        offline_layout.addWidget(atport_label)
        offline_layout.addWidget(self.atport_edit)
        offline_layout.addWidget(self.offline_button)
        offline_layout.addWidget(self.online_button)
        offline_layout.addWidget(self.hmi_button)
        case_layout = QHBoxLayout()
        case_layout.addWidget(case_label)
        case_layout.addWidget(self.case_edit)
        case_layout.addWidget(self.record_button)
        case_layout.addWidget(self.stop_button)
        netcapure_layout = QHBoxLayout()
        netcapure_layout.addWidget(net_label)
        netcapure_layout.addWidget(self.net_device_combo)
        netcapure_layout.addWidget(self.net_capture_enable)
        netcapure_layout.addWidget(self.ocpplog_button)
        main_layout = QVBoxLayout()
        main_layout.addLayout(device_layout)
        main_layout.addLayout(offline_layout)
        main_layout.addLayout(case_layout)
        main_layout.addLayout(netcapure_layout)
        self.setLayout(main_layout)

        # 绑定事件
        self.refresh_button.clicked.connect(self.refresh_devices)
        self.connect_button.clicked.connect(self.connect_device)
        self.disconnect_button.clicked.connect(self.disconnect_device)
        self.offline_button.clicked.connect(self.offline)
        self.online_button.clicked.connect(self.online)
        self.hmi_button.clicked.connect(self.connect_hmi)
        self.record_button.clicked.connect(self.start_record)
        self.stop_button.clicked.connect(self.stop_record)
        self.ocpplog_button.clicked.connect(self.open_ocpp)

        # 初始化设备列表
        self.refresh_devices()
        self.refresh_net_devices()
        # print(self.initInterfaceDict())

    def getConfigValue(self, name): # 获取config.ini中的配置 当前配置项只有一个tshark路径
        self.config = configparser.ConfigParser()
        self.config.read('config.ini')
        value = self.config.get("config", name)
        return value

    def refresh_devices(self):
        # 刷新设备列表
        devices = self.get_devices()
        if devices != self.devices:
            self.devices = devices
            self.device_combo.clear()
            self.device_combo.addItems(self.devices)

    def get_devices(self):
        # 获取设备列表
        devices = []
        try:
            if os.name == 'nt':
                startupinfo = subprocess.STARTUPINFO()
                startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
                startupinfo.wShowWindow = subprocess.SW_HIDE
            else:
                startupinfo = None
            output = subprocess.check_output(["./scrcpy-win64-v1.25/adb.exe", "devices"],startupinfo=startupinfo).decode("utf-8")
            for line in output.splitlines()[1:]:
                if line.strip():
                    devices.append(line.split("\t")[0])
        except subprocess.CalledProcessError:
            pass
        return devices

    def refresh_net_devices(self):
        # 刷新设备列表
        net_devices = self.initInterfaceDict()
        # print(net_devices)
        # if net_devices != self.initInterfaceDict:
        self.net_devices = net_devices
        #     self.net_device_combo.clear()
        self.net_device_combo.addItems(self.net_devices)
        #     print(net_devices)

    def initInterfaceDict(self):
        # 获取接口列表
        tshark_path = self.getConfigValue("tshark_path")
        cmd = f'{tshark_path} -D'
        # cmd = '"C:/Program Files/Wireshark/tshark.exe" -D'
        result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
        interfaceList = result.stdout.decode().strip().split('\r\n')
        # print(interfaceList)

        interfaceList_dict = {}
        for item in interfaceList:
            key = item.split('(', 1)[1].strip(') ')  # 获取括号中的内容
            value_list = [s for s in item.split() if s.startswith('\\')]
            if value_list:  # 检查列表是否为空
                value = value_list[0]  # 获取以"\\Device\\"开头的字符串
                interfaceList_dict[key] = value

        # print(interfaceList_dict)
        return interfaceList_dict

    def connect_device(self):
        # 连接设备
        self.device = self.device_combo.currentText()
        if not self.device:
            QMessageBox.warning(self, "警告", "请选择一个设备")
            return
        try:
            # subprocess.check_output(["adb", "-s", self.device, "shell", "echo", "connected"])
            if os.name == 'nt':
                startupinfo = subprocess.STARTUPINFO()
                startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
                startupinfo.wShowWindow = subprocess.SW_HIDE
            else:
                startupinfo = None
            name = "root"
            password = "*************"
            self.process = subprocess.Popen(["./scrcpy-win64-v1.25/adb.exe", "-s", self.device, "shell"], stdin=subprocess.PIPE, stdout=subprocess.PIPE,
                                       stderr=subprocess.PIPE,startupinfo=startupinfo,
                                       encoding="utf-8", universal_newlines=True)
            # self.wait()
            # print(self.stdout.read())
            # print("输入用户名")
            time.sleep(3)
            self.process.stdin.write(name + '\n')
            self.process.stdin.flush()
            # # print("输入密码")
            time.sleep(3)
            self.process.stdin.write(password + '\n')
            self.process.stdin.flush()
            self.connect_button.setEnabled(False)
            self.disconnect_button.setEnabled(True)
            self.record_button.setEnabled(True)
            self.case_edit.setEnabled(True)
            self.ocpplog_button.setEnabled(True)
        except subprocess.CalledProcessError:
            QMessageBox.warning(self, "警告", "连接设备失败")


    def disconnect_device(self):
        # 断开设备
        self.device = None
        self.connect_button.setEnabled(True)
        self.stop_button.setEnabled(False)
        self.stop_record()
        self.case_edit.setEnabled(False)
        self.record_button.setEnabled(False)
        self.ocpplog_button.setEnabled(False)
        self.disconnect_button.setEnabled(False)

    def offline(self):
        # 获取端口和波特率输入框中的值
        self.port = self.atport_edit.text()
        if not self.port:
            QMessageBox.warning(self, "警告", "请输入AT COM口如‘COM19’")
            return

        # 执行离线命令
        ser_obj = ser.Serial(port=self.port, baudrate=115200)
        ser_obj.write("AT+CFUN=4\r".encode())
        self.online_button.setEnabled(True)
        self.offline_button.setEnabled(False)

    def online(self):
        # 获取端口和波特率输入框中的值
        self.port = self.atport_edit.text()
        if not self.port:
            QMessageBox.warning(self, "警告", "请输入AT COM口如‘COM19’")
            return

        # 执行在线命令
        ser_obj = ser.Serial(port=self.port, baudrate=115200)
        ser_obj.write("AT+CFUN=1\r".encode())
        self.online_button.setEnabled(False)
        self.offline_button.setEnabled(True)

    def connect_hmi(self):
        # 检查设备状态
        if os.name == 'nt':
            startupinfo = subprocess.STARTUPINFO()
            startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
            startupinfo.wShowWindow = subprocess.SW_HIDE
        else:
            startupinfo = None
        hmi_devices = self.get_devices()
        print(hmi_devices)
        if '192.168.1.80:5555' in hmi_devices:
            # 设备已连接,直接启动scrcpy
            cmd = '.\scrcpy-win64-v1.25\scrcpy.exe -s 192.168.1.80 --always-on-top --max-size 640 --lock-video-orientation=2'
            self.process = subprocess.Popen(cmd, shell=True, encoding="utf-8",startupinfo=startupinfo)
        else:
            # 设备未连接,先连接设备再启动scrcpy
            # cmd = "./scrcpy-win64-v1.25/adb.exe", "connect" "192.168.1.80"
            self.process = subprocess.Popen(["./scrcpy-win64-v1.25/adb.exe", "connect", "192.168.1.80"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8", universal_newlines=True,startupinfo=startupinfo)
            time.sleep(5)
            cmd = '.\scrcpy-win64-v1.25\scrcpy.exe -s 192.168.1.80 --always-on-top --max-size 640 --lock-video-orientation=2'
            self.process = subprocess.Popen(cmd, shell=True, encoding="utf-8",startupinfo=startupinfo)

    def start_record(self):
        # 开始录制
        self.case_name = self.case_edit.text().strip()
        if not self.case_name:
            QMessageBox.warning(self, "警告", "请输入日志名")
            return
        if self.logcat_process or self.ocpplog_process:
            QMessageBox.warning(self, "警告", "正在录制中")
            return
        # 创建以当前日期为名字的文件夹
        current_date = datetime.datetime.now().strftime("%Y-%m-%d")
        self.log_folder = os.path.join(os.getcwd(), "Log", current_date)
        if not os.path.exists(self.log_folder):
            os.makedirs(self.log_folder)

        logcat_file = os.path.join(self.log_folder, self.get_logcat_file())
        ocpplog_file = os.path.join(self.log_folder, self.get_ocpplog_file())
        netlog_file = os.path.join(self.log_folder, self.get_netlog_file())

        if os.name == 'nt':
            startupinfo = subprocess.STARTUPINFO()
            startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
            startupinfo.wShowWindow = subprocess.SW_HIDE
        else:
            startupinfo = None
        self.logcat_process = subprocess.Popen(["./scrcpy-win64-v1.25/adb.exe", "-s", self.device, "logcat", "-s", "TBoxService "], stdout=open(logcat_file, "wb"), startupinfo=startupinfo)
        self.ocpplog_process = subprocess.Popen(["./scrcpy-win64-v1.25/adb.exe", "-s", self.device, "shell", "/oemapp/bin/ocppCli"], stdout=open(ocpplog_file, "wb"), startupinfo=startupinfo)
        if self.net_capture_enable.isChecked():
            # print("开始录制以太网报文")
            net_dict = self.initInterfaceDict()
            net_device = self.net_device_combo.currentText()
            net_id = net_dict[net_device]
            tshark_path = self.getConfigValue("tshark_path")
            cmd = f'{tshark_path} -i {net_id} -w "{netlog_file}"'
            print(cmd)
            self.net_capture_process = subprocess.Popen(cmd,startupinfo=startupinfo)
        self.record_button.setEnabled(False)
        self.stop_button.setEnabled(True)
        self.case_edit.setEnabled(False)
        self.disconnect_button.setEnabled(False)

    def stop_record(self):
        # 停止录制
        if self.logcat_process:
            self.logcat_process.terminate()
            self.logcat_process.wait()
            self.logcat_process = None
        if self.ocpplog_process:
            self.ocpplog_process.terminate()
            self.ocpplog_process.wait()
            self.ocpplog_process = None
        if self.net_capture_process:
            self.net_capture_process.terminate()
            self.net_capture_process.wait()
            self.net_capture_process = None
        self.record_button.setEnabled(True)
        self.stop_button.setEnabled(False)
        self.case_edit.setEnabled(True)
        self.disconnect_button.setEnabled(True)

    def get_logcat_file(self):
        # 获取logcat文件名
        base_name = "logcat_" + self.case_name + ".log"
        index = 1
        while os.path.exists(os.path.join(self.log_folder, base_name)):
            base_name = "logcat_" + self.case_name + "_" + str(index) + ".log"
            index += 1
        return base_name

    def get_ocpplog_file(self):
        # 获取ocpplog文件名
        base_name = "ocpplog_" + self.case_name + ".log"
        index = 1
        while os.path.exists(os.path.join(self.log_folder, base_name)):
            base_name = "ocpplog_" + self.case_name + "_" + str(index) + ".log"
            index += 1
        return base_name

    def get_netlog_file(self):
        # 获取netlog文件名
        base_name = "netlog_" + self.case_name + ".pcapng"
        index = 1
        while os.path.exists(os.path.join(self.log_folder, base_name)):
            base_name = "netlog_" + self.case_name + "_" + str(index) + ".pcapng"
            index += 1
        return base_name

    def open_ocpp(self):
        # os.system(' start cmd.exe /K  ./scrcpy-win64-v1.25/adb.exe -s b286526e shell /oemapp/bin/ocppCli')
        self.device = self.device_combo.currentText()
        self.process = subprocess.Popen(["./scrcpy-win64-v1.25/adb.exe", "-s", self.device, "shell", "/oemapp/bin/ocppCli"])

if __name__ == '__main__':
    app = QApplication(sys.argv)
    recorder = LogRecorder()
    recorder.show()
    sys.exit(app.exec_())

下一篇: Win10 Wsl2安装Ubuntu→

loading...