分享一个基于Qt的Ymodem的上位机(GitHub开源)

分享一个基于Qt的Ymodem的上位机(GitHub开源)

码农世界 2024-05-15 前端 61 次浏览 0个评论

文章目录

  • 1.项目地址
  • 2.Ymodem 协议介绍
  • 3.文件传输过程
  • 4.使用
  • 5.SecureCRT 软件也支持Ymodem
  • 6.基于PyQt5的Ymodem界面实现案例

    1.项目地址

    https://github.com/XinLiGH/SerialPortYmodem

    基于VS2019 Qt5.15.2 编译,Linux下编译也可以,这里不做说明。

    2.Ymodem 协议介绍

    YModem 协议是由 XModem 协议演变而来的,每包数据可以达到 1024 字节,是一个非常高效的文件传输协议。我们平常所说的 Ymodem 协议是指的 Ymodem-1K,除此还有 Ymodem-g(没有 CRC 校验,不常用)。YModem-1K 协议用 1024 字节数据帧传输取代了标准的 128 字节数据帧传输,发送的数据会使用 CRC 校验,保证数据传输的正确性。它每传输一个信息块时,就会等待接收端返回 ACK 信号,接收到响应信号后,才会继续传输下一个信息块,从而保证能够接收到全部数据。

    3.文件传输过程

    通常单片机程序是运行在主程序下的(单片机主程序不支持YModem),我这里由Qt上位机通过串口发送一条自定义的协议告诉单片机,进入升级模式。接着就是下图的流程:

    4.使用

    5.SecureCRT 软件也支持Ymodem

    6.基于PyQt5的Ymodem界面实现案例

    基于PyQt5做一些小工具真的很方便,大家感兴趣可以尝试下!

    #coding=utf-8
     
    from tkinter import *
    from tkinter.ttk import Combobox
    from tkinter import filedialog
    from tkinter import messagebox
    from ctypes import *
    import serial.tools.list_ports
    import threading
    from ymodem import YMODEM
    import os
    from time import sleep
     
    app = Tk()
    comportbox=Combobox(app,width = 7, height = 10)
    baudratebox=Combobox(app,width = 8, height = 10)
    red_canvas = Canvas(app, width=50, height=50,bg="red")
    green_canvas = Canvas(app, width=50, height=50,bg="green")
    progress_bar = Canvas(app, width = 350,height = 26,bg = "white")
    fill_line = progress_bar.create_rectangle(2,2,0,27,width = 0,fill = "green") 
    ser = serial.Serial(bytesize=8, parity='N', stopbits=1, timeout=1, write_timeout=3)
    linsten_lock = threading.Lock()
    need_listen = 0
    exit_listen = False
     
    def upgrade_callback(total_packets,file_size,file_name):
        if len(progress_bar.gettags("text")) == 0:
            progress_bar.create_text(175, 15, text=file_name, tags="text")
        progress = total_packets*350/(file_size/1024)
        progress_bar.coords(fill_line, (0, 0, progress, 30))
     
    def set_connect_logo(is_connected = True):
        if is_connected:
            red_canvas.place_forget()
            green_canvas.place(x=100,y=60)
            comportbox.configure(state='disabled')
            baudratebox.configure(state='disabled')
        else:
            green_canvas.place_forget()
            red_canvas.place(x=100,y=60)
            comportbox.configure(state='enabled')
            baudratebox.configure(state='enabled')
     
     
    def listen_connect_thread():
        global ser
        global need_listen
        global exit_listen
        global linsten_lock
        while exit_listen == False:
            linsten_lock.acquire()
            if need_listen == 0:
                linsten_lock.release()
                sleep(1)
                continue
            else:
                com_number = comportbox.get()
                port_found = 0
                plist = list(serial.tools.list_ports.comports())
            
                if len(plist) <= 0:
                    if ser.is_open == True:
                        ser.close()
                    set_connect_logo(False)
                else:
                    for item in plist:
                        if com_number == item[0]:
                            port_found = 1
                            break
                    if port_found == 0:
                        if ser.is_open == True:
                            ser.close()
                        set_connect_logo(False)
                    else:
                        if ser.is_open == False:
                            try:
                                ser.port = com_number
                                ser.open()
                                set_connect_logo(True)
                            except Exception as e:
                                print(e)
                                pass
                linsten_lock.release()
                sleep(0.3)
                continue
     
    def connect():
        global need_listen
        global ser
        com_number = comportbox.get()
        
        port_found = 0
        plist = list(serial.tools.list_ports.comports())
        
        if ser.is_open == True:
            messagebox.showinfo(title="Error", message="Already connected!")
            return
     
        if len(plist) <= 0:
            messagebox.showinfo(title="Error", message="No available serial!")
            need_listen = 0
            set_connect_logo(False)
            return
        else:
            for item in plist:
                if com_number == item[0]:
                    port_found = 1
                    break
            if port_found == 0:
                need_listen = 0
                set_connect_logo(False)
                messagebox.showinfo(title="Error", message="Cannot find serial "+com_number)
                return
        try:
            ser.port = com_number
            ser.baudrate = 115200 #int(baud_rate)
            if ser.is_open == False:
                ser.open()
        except Exception as e:
            if ser.is_open == False:
                need_listen = 0
                set_connect_logo(False)
            messagebox.showinfo(title="Error", message=e)
            return
     
        set_connect_logo(True)
        need_listen = 1
        
        global listen_connect
        if listen_connect.is_alive() == False:
            listen_connect.start()
     
        
    def disconnect():
        global ser
        global need_listen
        global linsten_lock
        linsten_lock.acquire()
        need_listen = 0
        linsten_lock.release()
        #sleep(0.1)
     
        if ser.is_open == False:
            messagebox.showinfo(title="Error", message="Serial not connected!")
        else:
            try:
                ser.close()
            except Exception as e:
                messagebox.showinfo(title="Error", message=e)
                return
        need_listen = 0
        set_connect_logo(False)
     
    def cancel():
        global ymodem_sender
        if upgrade_button['state'] == 'disabled':
            ymodem_sender.abort()
     
    def upgrade():
        global upgrade_button
        if ser.is_open == False:
            messagebox.showinfo(title="Error", message="Please connect the serial first!")
            upgrade_button.configure(state='active')
            return
        file_list = filedialog.askopenfilenames(filetypes=[("bin file", "*.bin"),("all","*.*")])
        if len(file_list) <= 0:
            upgrade_button.configure(state='active')
            return
        else:
            ret_val = prepare_upgrade()
        if ret_val < 0:
            upgrade_button.configure(state='active')
            return
        
        upgrade_button.configure(state='disabled')
        disconnect_button.configure(state='disabled')
        upgrade_thread = threading.Thread(target=do_upgrade,args = (file_list,))
        upgrade_thread.start()
     
    def show_progress_bar(show=True):
        if show:
            progress_bar.place(x=10,y=150)
        else:
            progress_bar.place_forget()
            progress_bar.coords(fill_line, (0, 0, 0, 30))
            progress_bar.delete('text')
     
    def serial_reconnect(baud_rate = 115200, timeout = 1):
        need_sleep = 1
        if  ser.baudrate == baud_rate:
            need_sleep = 0
        try:
            ser.timeout = timeout
            ser.baudrate = baud_rate
            ser.close()
            ser.open()
        except Exception as e:
            raise Exception("Reconnect Fail")
        if need_sleep:
            sleep(1)
        
    def do_upgrade(file_list):
        global upgrade_button
        need_listen = 0 
        sleep(1)
        baud_rate = baudratebox.get()
        try:
            serial_reconnect()
        except Exception as e:
            print(e)
        ser.write("\r".encode("utf-8"))
        ch_str = "upgrade -t 0 " + str(int(baud_rate)) + "\r"
        ser.write(ch_str.encode("utf-8"))
        sleep(1)
     
        for file in file_list:
            file_size = os.path.getsize(file)
            if file_size > 2*1024*1024:
                continue
            try:
                serial_reconnect(baud_rate = int(baud_rate), timeout = 5)
            except Exception as e:
                print(e)
            show_progress_bar(True)
            if len(progress_bar.gettags("text")) == 0:
                progress_bar.create_text(175, 15, text=os.path.basename(file), tags="text")
            ser.read_all()
            ser.write("\r".encode("utf-8"))
            ser.write("upgrade -u\r".encode("utf-8"))
            
            while True:
                ch_str = ser.read(4).decode("utf-8")
                if ch_str == "CCCC":
                    break
            ymodem_send(file)
            while True:
                if ser.read(1).decode("utf-8") == 'I':
                    if ser.read(1).decode("utf-8") == 'E':
                        if ser.read(1).decode("utf-8") == 'T':
                            ymodem_sender.log.info("Receive IET")
                            show_progress_bar(False)
                            sleep(1)
                            break
                        else:
                            continue
                    else:
                        continue
        
        show_progress_bar(False)
        try:
            ser.write("\r".encode("utf-8"))
            ser.write("upgrade -t 0 115200\r".encode("utf-8"))
            sleep(1)
            serial_reconnect()
        except Exception as e:
            print(e)
        upgrade_button.configure(state='active')
        disconnect_button.configure(state='active')
        need_listen = 1
        
    def ymodem_send(file):
        global ymodem_sender
        try:
            file_stream = open(file, 'rb')
        except IOError as e:
            raise Exception("Open file fail!")
        file_name = os.path.basename(file)
        file_size = os.path.getsize(file)
        
        rate = baudratebox.get()
        
        try:
            serial_reconnect(baud_rate = int(rate), timeout=5)
        except Exception as e:
            messagebox.showinfo(title="Error", message="Connection error!")
            return
     
        try:
            ymodem_sender.send(file_stream, file_name,file_size,callback=upgrade_callback)
        except Exception as e:
            file_stream.close()
            raise
        file_stream.close()
      
    def prepare_upgrade():
        global ser
        ser.flushOutput()
        ser.flushInput()
        ser.write("\r".encode("utf-8"))
        ret_str = ser.read(1024).decode("utf-8")
        
        b_reset= False
        if ret_str.find("IET") == -1:
            try:
                serial_reconnect(baud_rate = 9600)
                b_reset = True
            except Exception as e:
                messagebox.showinfo(title="Error", message=e)
                return -1
        ser.read_all()
        ser.write("flash -u\r".encode("utf-8"))
        sleep(0.5)
        read_byte = ser.read_all()
        
        if len(read_byte) <= 3:
            b_reset = True
        else:
            ret_str = read_byte[0:min(20,len(read_byte))].decode("utf-8")
            if ret_str.find("IET") != -1:
                b_reset = True
            else:
                b_reset = False
        if b_reset:
            messagebox.showinfo(title="Tips", message="Please reset/reconnect board first, then press [OK]")
            sleep(0.5)
            serial_reconnect()
        return 0
        
    def sender_getc(size):
        return ser.read(size) or None
     
    def sender_putc(data):
        send_data_mutex.acquire()
        ser.write(data)
        send_data_mutex.release()
     
     
    connect_button = Button(app, text="连接",width = 8,height = 1,command=connect)
    disconnect_button = Button(app, text="断开",width = 8,height = 1,command=disconnect)
    upgrade_button = Button(app, text="升级",width = 8,height = 1,command=upgrade)
    cancel_button = Button(app, text="取消升级",width = 8,height = 1,command=cancel) 
    listen_connect = threading.Thread(target=listen_connect_thread)
    send_data_mutex = threading.Lock()
    ymodem_sender = YMODEM(sender_getc, sender_putc)
     
     
     
    def init_layout():
        app.title("xx科技")
        app.iconbitmap(".\\xxxx.ico")
        app.geometry('400x200') 
        app.protocol("WM_DELETE_WINDOW", on_closing)
        app.resizable(width=False, height=False)
     
        Label(app, text="端口:", font=('Arial', 14)).place(x=10, y=10)
     
        comportbox["value"] = ("COM1","COM2","COM3","COM4","COM5","COM6","COM7","COM8","COM9","COM10")
        comportbox.current(2)
        comportbox.place(x=75, y=14)
        
        Label(app, text="波特率:", font=('Arial', 14)).place(x=200, y=10)
     
        baudratebox["value"] = ("9600","115200","230400","576000")
        baudratebox.current(1)
        baudratebox.place(x=285, y=14)
        
        connect_button.place(x=10,y=50)
        disconnect_button.place(x=10,y=100)
        cancel_button.place(x=285,y=50)
        upgrade_button.place(x=285,y=100)
        
        show_progress_bar(False)
        
        set_connect_logo(False)
     
    def main():
        init_layout()
        mainloop()
     
    def on_closing():
        exit_listen = True
        sleep(0.4)
        try:
            ser.close()
        except Exception as e:
            print(e)
            pass 
        app.destroy()
     
    if __name__ == '__main__':
        main()
    
    # -*- coding: utf-8 -*-
     
    import logging
    from time import sleep
    logging.basicConfig(level = logging.DEBUG, format = '%(asctime)s - %(levelname)s - %(message)s')
     
    # ymodem data header byte
    SOH = b'\x01'
    STX = b'\x02'
    EOT = b'\x04'
    ACK = b'\x06'
    NAK = b'\x15'
    CAN = b'\x18'
    CRC = b'C'
     
    class YMODEM(object):
     
        # initialize
        def __init__(self, getc, putc, mode='ymodem', header_pad=b'\x00', pad=b'\x1a'):
            self.getc = getc
            self.putc = putc
            self.mode = mode
            self.header_pad = header_pad
            self.pad = pad
            self.log = logging.getLogger('YReporter')
     
        # send abort(CAN) twice
        def abort(self, count=2):
            for _ in range(count):
                self.putc(CAN)
             
        '''
        send entry
        '''
        def send(self, file_stream, file_name, file_size=0, retry=20, timeout=15, callback=None):
            try:
                packet_size = dict(
                    ymodem = 1024,
                    ymodem128 = 128
                )[self.mode]
            except KeyError:
                raise ValueError("Invalid mode specified: {self.mode!r}".format(self=self))
     
            self.log.debug('Begin start sequence')
     
            # Receive first character
            error_count = 0
            cancel = 0
            while True:
                char = self.getc(1)
                if char:
                    if char == CRC:
                        # Expected CRC
                        self.log.info("<<< CRC")
                        break
                    elif char == CAN:
                        self.log.info("<<< CAN")
                        if cancel:
                            return False
                        else:
                            cancel = 1
                    else:
                        self.log.error("send error, expected CRC or CAN, but got " + hex(ord(char)))
     
                error_count += 1
                if error_count > retry:
                    self.abort()
                    self.log.error("send error: error_count reached %d aborting", retry)
                    return False
     
            header = self._make_send_header(128, 0)
            name = bytes(file_name, encoding = "utf8")
            
            size = bytes(str(file_size),encoding = "utf8")
            
            data = name + b'\x00' + size + b'\x20'
     
            data = data.ljust(128, self.header_pad)
            
            checksum = self._make_send_checksum(data)
            data_for_send = header + data + checksum
            self.putc(data_for_send)
            #self.log.info("Packet 0 >>> " + str(len(data_for_send)))
     
            error_count = 0
            cancel = 0
            # Receive reponse of first packet
            while True:
                char = self.getc(1)
                if char:
                    if char == ACK:
                        self.log.info("<<< ACK")
                        char2 = self.getc(1)
                        if char2 == CRC:
                            self.log.info("<<< CRC")
                            break
                        else:
                            self.log.warn("ACK wasnt CRCd")
                            break
                    elif char == CAN:
                        self.log.info("<<< CAN")
                        if cancel:
                            return False
                        else:
                            cancel = 1
                    else:
                        if (ord(char)) >= 0x20 and (ord(char)) <= 0x7e:
                            self.log.error("test" + str(char))
                        else:
                            self.log.error("send error, expected ACK or CAN, but got " + hex(ord(char)))
            
            error_count = 0
            cancel = 0
            total_packets = 1
            sequence = 1
            sleep(1)
            while True:
                # Read raw data from file stream
                data = file_stream.read(packet_size)
                if not data:
                    self.log.debug('send: at EOF')
                    break
                total_packets += 1
     
                header = self._make_send_header(packet_size, sequence)
                data = data.ljust(packet_size, self.pad)
                checksum = self._make_send_checksum(data)
     
                while True:
                    data_for_send = header + data + checksum
                    # data_in_hexstring = "".join("%02x" % b for b in data_for_send)
     
                    # Send file data packet
                    self.putc(data_for_send)
                    #self.log.info("Packet " + str(sequence) + " >>>" + str(len(data_for_send)))
                    error_count = 0
                    while True:
                        char = self.getc(1)
                        if char == ACK:
                            break
                        else:
                            error_count += 1
                        if error_count > retry:
                            self.abort()
                            return False
                    
                    error_count = 0
                    if char == ACK:
                        # Expected response
                        #self.log.info("<<< ACK")
                        if callable(callback):
                            callback(total_packets,file_size,file_name)
                        error_count = 0
                        break
                    
                    error_count += 1
     
                    if error_count > retry:
                        self.abort()
                        self.log.error('send error: NAK received %d , aborting', retry)
                        return False
     
                sequence = (sequence + 1) % 0x100
     
            # Send EOT and expect final ACK
            error_count = 0
            while True:
                self.putc(EOT)
                self.log.info(">>> EOT")
                char = self.getc(1)
                if char == ACK:
                    self.log.info("<<< ACK")
                    break
                else:
                    error_count += 1
                    if error_count > retry:
                        self.abort()
                        self.log.warn('EOT wasnt ACKd, aborting transfer')
                        return False
     
            header = self._make_send_header(128, 0)
            
            data = bytearray(b'\x00')
     
            data = data.ljust(128, self.header_pad)
            
            checksum = self._make_send_checksum(data)
            data_for_send = header + data + checksum
            self.putc(data_for_send)
            
            error_count = 0
            while True:
                char = self.getc(1)
                if char == ACK :
                    break
                else:
                    error_count += 1
                    if error_count > retry:
                        self.abort()
                        self.log.warn('SOH wasnt ACK, aborting transfer')
                        return False
            
            self.log.info('Transmission successful (ACK received)')
            return True
     
        # Header byte
        def _make_send_header(self, packet_size, sequence):
            assert packet_size in (128, 1024), packet_size
            _bytes = []
            if packet_size == 128:
                _bytes.append(ord(SOH))
            elif packet_size == 1024:
                    _bytes.append(ord(STX))
            _bytes.extend([sequence, 0xff - sequence])
            return bytearray(_bytes)
     
        # Make check code
        def _make_send_checksum(self, data):
            _bytes = []
            crc = self.calc_crc(data)
            _bytes.extend([crc >> 8, crc & 0xff])
            return bytearray(_bytes)
     
        def _verify_recv_checksum(self, data):
            _checksum = bytearray(data[-2:])
            their_sum = (_checksum[0] << 8) + _checksum[1]
            data = data[:-2]
     
            our_sum = self.calc_crc(data)
            valid = bool(their_sum == our_sum)
            return valid, data
     
        # For CRC algorithm
        crctable = [
            0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50a5, 0x60c6, 0x70e7,
            0x8108, 0x9129, 0xa14a, 0xb16b, 0xc18c, 0xd1ad, 0xe1ce, 0xf1ef,
            0x1231, 0x0210, 0x3273, 0x2252, 0x52b5, 0x4294, 0x72f7, 0x62d6,
            0x9339, 0x8318, 0xb37b, 0xa35a, 0xd3bd, 0xc39c, 0xf3ff, 0xe3de,
            0x2462, 0x3443, 0x0420, 0x1401, 0x64e6, 0x74c7, 0x44a4, 0x5485,
            0xa56a, 0xb54b, 0x8528, 0x9509, 0xe5ee, 0xf5cf, 0xc5ac, 0xd58d,
            0x3653, 0x2672, 0x1611, 0x0630, 0x76d7, 0x66f6, 0x5695, 0x46b4,
            0xb75b, 0xa77a, 0x9719, 0x8738, 0xf7df, 0xe7fe, 0xd79d, 0xc7bc,
            0x48c4, 0x58e5, 0x6886, 0x78a7, 0x0840, 0x1861, 0x2802, 0x3823,
            0xc9cc, 0xd9ed, 0xe98e, 0xf9af, 0x8948, 0x9969, 0xa90a, 0xb92b,
            0x5af5, 0x4ad4, 0x7ab7, 0x6a96, 0x1a71, 0x0a50, 0x3a33, 0x2a12,
            0xdbfd, 0xcbdc, 0xfbbf, 0xeb9e, 0x9b79, 0x8b58, 0xbb3b, 0xab1a,
            0x6ca6, 0x7c87, 0x4ce4, 0x5cc5, 0x2c22, 0x3c03, 0x0c60, 0x1c41,
            0xedae, 0xfd8f, 0xcdec, 0xddcd, 0xad2a, 0xbd0b, 0x8d68, 0x9d49,
            0x7e97, 0x6eb6, 0x5ed5, 0x4ef4, 0x3e13, 0x2e32, 0x1e51, 0x0e70,
            0xff9f, 0xefbe, 0xdfdd, 0xcffc, 0xbf1b, 0xaf3a, 0x9f59, 0x8f78,
            0x9188, 0x81a9, 0xb1ca, 0xa1eb, 0xd10c, 0xc12d, 0xf14e, 0xe16f,
            0x1080, 0x00a1, 0x30c2, 0x20e3, 0x5004, 0x4025, 0x7046, 0x6067,
            0x83b9, 0x9398, 0xa3fb, 0xb3da, 0xc33d, 0xd31c, 0xe37f, 0xf35e,
            0x02b1, 0x1290, 0x22f3, 0x32d2, 0x4235, 0x5214, 0x6277, 0x7256,
            0xb5ea, 0xa5cb, 0x95a8, 0x8589, 0xf56e, 0xe54f, 0xd52c, 0xc50d,
            0x34e2, 0x24c3, 0x14a0, 0x0481, 0x7466, 0x6447, 0x5424, 0x4405,
            0xa7db, 0xb7fa, 0x8799, 0x97b8, 0xe75f, 0xf77e, 0xc71d, 0xd73c,
            0x26d3, 0x36f2, 0x0691, 0x16b0, 0x6657, 0x7676, 0x4615, 0x5634,
            0xd94c, 0xc96d, 0xf90e, 0xe92f, 0x99c8, 0x89e9, 0xb98a, 0xa9ab,
            0x5844, 0x4865, 0x7806, 0x6827, 0x18c0, 0x08e1, 0x3882, 0x28a3,
            0xcb7d, 0xdb5c, 0xeb3f, 0xfb1e, 0x8bf9, 0x9bd8, 0xabbb, 0xbb9a,
            0x4a75, 0x5a54, 0x6a37, 0x7a16, 0x0af1, 0x1ad0, 0x2ab3, 0x3a92,
            0xfd2e, 0xed0f, 0xdd6c, 0xcd4d, 0xbdaa, 0xad8b, 0x9de8, 0x8dc9,
            0x7c26, 0x6c07, 0x5c64, 0x4c45, 0x3ca2, 0x2c83, 0x1ce0, 0x0cc1,
            0xef1f, 0xff3e, 0xcf5d, 0xdf7c, 0xaf9b, 0xbfba, 0x8fd9, 0x9ff8,
            0x6e17, 0x7e36, 0x4e55, 0x5e74, 0x2e93, 0x3eb2, 0x0ed1, 0x1ef0,
        ]
     
        # CRC algorithm: CCITT-0
        def calc_crc(self, data, crc=0):
            for char in bytearray(data):
                crctbl_idx = ((crc >> 8) ^ char) & 0xff
                crc = ((crc << 8) ^ self.crctable[crctbl_idx]) & 0xffff
            return crc & 0xffff
     
     
     
    if __name__ == '__main__':
        pass
    

    参考:

    https://blog.csdn.net/u012993936/article/details/125102816

    https://blog.csdn.net/gitblog_00099/article/details/137135096

    https://blog.csdn.net/xuyinxin/article/details/104174587

    https://github.com/alexwoo1900/ymodem.git

转载请注明来自码农世界,本文标题:《分享一个基于Qt的Ymodem的上位机(GitHub开源)》

百度分享代码,如果开启HTTPS请参考李洋个人博客
每一天,每一秒,你所做的决定都会改变你的人生!

发表评论

快捷回复:

评论列表 (暂无评论,61人围观)参与讨论

还没有评论,来说两句吧...

Top