文章目录
- 1.项目地址
- 2.Ymodem 协议介绍
- 3.文件传输过程
- 4.使用
- 5.SecureCRT 软件也支持Ymodem
- 6.基于PyQt5的Ymodem界面实现案例
1.项目地址
https://github.com/XinLiGH/SerialPortYmodem
基于VS2019 Qt5.15.2 编译,Linux下编译也可以,这里不做说明。
2.Ymodem 协议介绍
YModem 协议是由 XModem 协议演变而来的,每包数据可以达到 1024 字节,是一个非常高效的文件传输协议。我们平常所说的 Ymodem 协议是指的 Ymodem-1K,除此还有 Ymodem-g(没有 CRC 校验,不常用)。YModem-1K 协议用 1024 字节数据帧传输取代了标准的 128 字节数据帧传输,发送的数据会使用 CRC 校验,保证数据传输的正确性。它每传输一个信息块时,就会等待接收端返回 ACK 信号,接收到响应信号后,才会继续传输下一个信息块,从而保证能够接收到全部数据。
3.文件传输过程
通常单片机程序是运行在主程序下的(单片机主程序不支持YModem),我这里由Qt上位机通过串口发送一条自定义的协议告诉单片机,进入升级模式。接着就是下图的流程:
4.使用
5.SecureCRT 软件也支持Ymodem
6.基于PyQt5的Ymodem界面实现案例
基于PyQt5做一些小工具真的很方便,大家感兴趣可以尝试下!
#coding=utf-8 from tkinter import * from tkinter.ttk import Combobox from tkinter import filedialog from tkinter import messagebox from ctypes import * import serial.tools.list_ports import threading from ymodem import YMODEM import os from time import sleep app = Tk() comportbox=Combobox(app,width = 7, height = 10) baudratebox=Combobox(app,width = 8, height = 10) red_canvas = Canvas(app, width=50, height=50,bg="red") green_canvas = Canvas(app, width=50, height=50,bg="green") progress_bar = Canvas(app, width = 350,height = 26,bg = "white") fill_line = progress_bar.create_rectangle(2,2,0,27,width = 0,fill = "green") ser = serial.Serial(bytesize=8, parity='N', stopbits=1, timeout=1, write_timeout=3) linsten_lock = threading.Lock() need_listen = 0 exit_listen = False def upgrade_callback(total_packets,file_size,file_name): if len(progress_bar.gettags("text")) == 0: progress_bar.create_text(175, 15, text=file_name, tags="text") progress = total_packets*350/(file_size/1024) progress_bar.coords(fill_line, (0, 0, progress, 30)) def set_connect_logo(is_connected = True): if is_connected: red_canvas.place_forget() green_canvas.place(x=100,y=60) comportbox.configure(state='disabled') baudratebox.configure(state='disabled') else: green_canvas.place_forget() red_canvas.place(x=100,y=60) comportbox.configure(state='enabled') baudratebox.configure(state='enabled') def listen_connect_thread(): global ser global need_listen global exit_listen global linsten_lock while exit_listen == False: linsten_lock.acquire() if need_listen == 0: linsten_lock.release() sleep(1) continue else: com_number = comportbox.get() port_found = 0 plist = list(serial.tools.list_ports.comports()) if len(plist) <= 0: if ser.is_open == True: ser.close() set_connect_logo(False) else: for item in plist: if com_number == item[0]: port_found = 1 break if port_found == 0: if ser.is_open == True: ser.close() set_connect_logo(False) else: if ser.is_open == False: try: ser.port = com_number ser.open() set_connect_logo(True) except Exception as e: print(e) pass linsten_lock.release() sleep(0.3) continue def connect(): global need_listen global ser com_number = comportbox.get() port_found = 0 plist = list(serial.tools.list_ports.comports()) if ser.is_open == True: messagebox.showinfo(title="Error", message="Already connected!") return if len(plist) <= 0: messagebox.showinfo(title="Error", message="No available serial!") need_listen = 0 set_connect_logo(False) return else: for item in plist: if com_number == item[0]: port_found = 1 break if port_found == 0: need_listen = 0 set_connect_logo(False) messagebox.showinfo(title="Error", message="Cannot find serial "+com_number) return try: ser.port = com_number ser.baudrate = 115200 #int(baud_rate) if ser.is_open == False: ser.open() except Exception as e: if ser.is_open == False: need_listen = 0 set_connect_logo(False) messagebox.showinfo(title="Error", message=e) return set_connect_logo(True) need_listen = 1 global listen_connect if listen_connect.is_alive() == False: listen_connect.start() def disconnect(): global ser global need_listen global linsten_lock linsten_lock.acquire() need_listen = 0 linsten_lock.release() #sleep(0.1) if ser.is_open == False: messagebox.showinfo(title="Error", message="Serial not connected!") else: try: ser.close() except Exception as e: messagebox.showinfo(title="Error", message=e) return need_listen = 0 set_connect_logo(False) def cancel(): global ymodem_sender if upgrade_button['state'] == 'disabled': ymodem_sender.abort() def upgrade(): global upgrade_button if ser.is_open == False: messagebox.showinfo(title="Error", message="Please connect the serial first!") upgrade_button.configure(state='active') return file_list = filedialog.askopenfilenames(filetypes=[("bin file", "*.bin"),("all","*.*")]) if len(file_list) <= 0: upgrade_button.configure(state='active') return else: ret_val = prepare_upgrade() if ret_val < 0: upgrade_button.configure(state='active') return upgrade_button.configure(state='disabled') disconnect_button.configure(state='disabled') upgrade_thread = threading.Thread(target=do_upgrade,args = (file_list,)) upgrade_thread.start() def show_progress_bar(show=True): if show: progress_bar.place(x=10,y=150) else: progress_bar.place_forget() progress_bar.coords(fill_line, (0, 0, 0, 30)) progress_bar.delete('text') def serial_reconnect(baud_rate = 115200, timeout = 1): need_sleep = 1 if ser.baudrate == baud_rate: need_sleep = 0 try: ser.timeout = timeout ser.baudrate = baud_rate ser.close() ser.open() except Exception as e: raise Exception("Reconnect Fail") if need_sleep: sleep(1) def do_upgrade(file_list): global upgrade_button need_listen = 0 sleep(1) baud_rate = baudratebox.get() try: serial_reconnect() except Exception as e: print(e) ser.write("\r".encode("utf-8")) ch_str = "upgrade -t 0 " + str(int(baud_rate)) + "\r" ser.write(ch_str.encode("utf-8")) sleep(1) for file in file_list: file_size = os.path.getsize(file) if file_size > 2*1024*1024: continue try: serial_reconnect(baud_rate = int(baud_rate), timeout = 5) except Exception as e: print(e) show_progress_bar(True) if len(progress_bar.gettags("text")) == 0: progress_bar.create_text(175, 15, text=os.path.basename(file), tags="text") ser.read_all() ser.write("\r".encode("utf-8")) ser.write("upgrade -u\r".encode("utf-8")) while True: ch_str = ser.read(4).decode("utf-8") if ch_str == "CCCC": break ymodem_send(file) while True: if ser.read(1).decode("utf-8") == 'I': if ser.read(1).decode("utf-8") == 'E': if ser.read(1).decode("utf-8") == 'T': ymodem_sender.log.info("Receive IET") show_progress_bar(False) sleep(1) break else: continue else: continue show_progress_bar(False) try: ser.write("\r".encode("utf-8")) ser.write("upgrade -t 0 115200\r".encode("utf-8")) sleep(1) serial_reconnect() except Exception as e: print(e) upgrade_button.configure(state='active') disconnect_button.configure(state='active') need_listen = 1 def ymodem_send(file): global ymodem_sender try: file_stream = open(file, 'rb') except IOError as e: raise Exception("Open file fail!") file_name = os.path.basename(file) file_size = os.path.getsize(file) rate = baudratebox.get() try: serial_reconnect(baud_rate = int(rate), timeout=5) except Exception as e: messagebox.showinfo(title="Error", message="Connection error!") return try: ymodem_sender.send(file_stream, file_name,file_size,callback=upgrade_callback) except Exception as e: file_stream.close() raise file_stream.close() def prepare_upgrade(): global ser ser.flushOutput() ser.flushInput() ser.write("\r".encode("utf-8")) ret_str = ser.read(1024).decode("utf-8") b_reset= False if ret_str.find("IET") == -1: try: serial_reconnect(baud_rate = 9600) b_reset = True except Exception as e: messagebox.showinfo(title="Error", message=e) return -1 ser.read_all() ser.write("flash -u\r".encode("utf-8")) sleep(0.5) read_byte = ser.read_all() if len(read_byte) <= 3: b_reset = True else: ret_str = read_byte[0:min(20,len(read_byte))].decode("utf-8") if ret_str.find("IET") != -1: b_reset = True else: b_reset = False if b_reset: messagebox.showinfo(title="Tips", message="Please reset/reconnect board first, then press [OK]") sleep(0.5) serial_reconnect() return 0 def sender_getc(size): return ser.read(size) or None def sender_putc(data): send_data_mutex.acquire() ser.write(data) send_data_mutex.release() connect_button = Button(app, text="连接",width = 8,height = 1,command=connect) disconnect_button = Button(app, text="断开",width = 8,height = 1,command=disconnect) upgrade_button = Button(app, text="升级",width = 8,height = 1,command=upgrade) cancel_button = Button(app, text="取消升级",width = 8,height = 1,command=cancel) listen_connect = threading.Thread(target=listen_connect_thread) send_data_mutex = threading.Lock() ymodem_sender = YMODEM(sender_getc, sender_putc) def init_layout(): app.title("xx科技") app.iconbitmap(".\\xxxx.ico") app.geometry('400x200') app.protocol("WM_DELETE_WINDOW", on_closing) app.resizable(width=False, height=False) Label(app, text="端口:", font=('Arial', 14)).place(x=10, y=10) comportbox["value"] = ("COM1","COM2","COM3","COM4","COM5","COM6","COM7","COM8","COM9","COM10") comportbox.current(2) comportbox.place(x=75, y=14) Label(app, text="波特率:", font=('Arial', 14)).place(x=200, y=10) baudratebox["value"] = ("9600","115200","230400","576000") baudratebox.current(1) baudratebox.place(x=285, y=14) connect_button.place(x=10,y=50) disconnect_button.place(x=10,y=100) cancel_button.place(x=285,y=50) upgrade_button.place(x=285,y=100) show_progress_bar(False) set_connect_logo(False) def main(): init_layout() mainloop() def on_closing(): exit_listen = True sleep(0.4) try: ser.close() except Exception as e: print(e) pass app.destroy() if __name__ == '__main__': main()
# -*- coding: utf-8 -*- import logging from time import sleep logging.basicConfig(level = logging.DEBUG, format = '%(asctime)s - %(levelname)s - %(message)s') # ymodem data header byte SOH = b'\x01' STX = b'\x02' EOT = b'\x04' ACK = b'\x06' NAK = b'\x15' CAN = b'\x18' CRC = b'C' class YMODEM(object): # initialize def __init__(self, getc, putc, mode='ymodem', header_pad=b'\x00', pad=b'\x1a'): self.getc = getc self.putc = putc self.mode = mode self.header_pad = header_pad self.pad = pad self.log = logging.getLogger('YReporter') # send abort(CAN) twice def abort(self, count=2): for _ in range(count): self.putc(CAN) ''' send entry ''' def send(self, file_stream, file_name, file_size=0, retry=20, timeout=15, callback=None): try: packet_size = dict( ymodem = 1024, ymodem128 = 128 )[self.mode] except KeyError: raise ValueError("Invalid mode specified: {self.mode!r}".format(self=self)) self.log.debug('Begin start sequence') # Receive first character error_count = 0 cancel = 0 while True: char = self.getc(1) if char: if char == CRC: # Expected CRC self.log.info("<<< CRC") break elif char == CAN: self.log.info("<<< CAN") if cancel: return False else: cancel = 1 else: self.log.error("send error, expected CRC or CAN, but got " + hex(ord(char))) error_count += 1 if error_count > retry: self.abort() self.log.error("send error: error_count reached %d aborting", retry) return False header = self._make_send_header(128, 0) name = bytes(file_name, encoding = "utf8") size = bytes(str(file_size),encoding = "utf8") data = name + b'\x00' + size + b'\x20' data = data.ljust(128, self.header_pad) checksum = self._make_send_checksum(data) data_for_send = header + data + checksum self.putc(data_for_send) #self.log.info("Packet 0 >>> " + str(len(data_for_send))) error_count = 0 cancel = 0 # Receive reponse of first packet while True: char = self.getc(1) if char: if char == ACK: self.log.info("<<< ACK") char2 = self.getc(1) if char2 == CRC: self.log.info("<<< CRC") break else: self.log.warn("ACK wasnt CRCd") break elif char == CAN: self.log.info("<<< CAN") if cancel: return False else: cancel = 1 else: if (ord(char)) >= 0x20 and (ord(char)) <= 0x7e: self.log.error("test" + str(char)) else: self.log.error("send error, expected ACK or CAN, but got " + hex(ord(char))) error_count = 0 cancel = 0 total_packets = 1 sequence = 1 sleep(1) while True: # Read raw data from file stream data = file_stream.read(packet_size) if not data: self.log.debug('send: at EOF') break total_packets += 1 header = self._make_send_header(packet_size, sequence) data = data.ljust(packet_size, self.pad) checksum = self._make_send_checksum(data) while True: data_for_send = header + data + checksum # data_in_hexstring = "".join("%02x" % b for b in data_for_send) # Send file data packet self.putc(data_for_send) #self.log.info("Packet " + str(sequence) + " >>>" + str(len(data_for_send))) error_count = 0 while True: char = self.getc(1) if char == ACK: break else: error_count += 1 if error_count > retry: self.abort() return False error_count = 0 if char == ACK: # Expected response #self.log.info("<<< ACK") if callable(callback): callback(total_packets,file_size,file_name) error_count = 0 break error_count += 1 if error_count > retry: self.abort() self.log.error('send error: NAK received %d , aborting', retry) return False sequence = (sequence + 1) % 0x100 # Send EOT and expect final ACK error_count = 0 while True: self.putc(EOT) self.log.info(">>> EOT") char = self.getc(1) if char == ACK: self.log.info("<<< ACK") break else: error_count += 1 if error_count > retry: self.abort() self.log.warn('EOT wasnt ACKd, aborting transfer') return False header = self._make_send_header(128, 0) data = bytearray(b'\x00') data = data.ljust(128, self.header_pad) checksum = self._make_send_checksum(data) data_for_send = header + data + checksum self.putc(data_for_send) error_count = 0 while True: char = self.getc(1) if char == ACK : break else: error_count += 1 if error_count > retry: self.abort() self.log.warn('SOH wasnt ACK, aborting transfer') return False self.log.info('Transmission successful (ACK received)') return True # Header byte def _make_send_header(self, packet_size, sequence): assert packet_size in (128, 1024), packet_size _bytes = [] if packet_size == 128: _bytes.append(ord(SOH)) elif packet_size == 1024: _bytes.append(ord(STX)) _bytes.extend([sequence, 0xff - sequence]) return bytearray(_bytes) # Make check code def _make_send_checksum(self, data): _bytes = [] crc = self.calc_crc(data) _bytes.extend([crc >> 8, crc & 0xff]) return bytearray(_bytes) def _verify_recv_checksum(self, data): _checksum = bytearray(data[-2:]) their_sum = (_checksum[0] << 8) + _checksum[1] data = data[:-2] our_sum = self.calc_crc(data) valid = bool(their_sum == our_sum) return valid, data # For CRC algorithm crctable = [ 0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50a5, 0x60c6, 0x70e7, 0x8108, 0x9129, 0xa14a, 0xb16b, 0xc18c, 0xd1ad, 0xe1ce, 0xf1ef, 0x1231, 0x0210, 0x3273, 0x2252, 0x52b5, 0x4294, 0x72f7, 0x62d6, 0x9339, 0x8318, 0xb37b, 0xa35a, 0xd3bd, 0xc39c, 0xf3ff, 0xe3de, 0x2462, 0x3443, 0x0420, 0x1401, 0x64e6, 0x74c7, 0x44a4, 0x5485, 0xa56a, 0xb54b, 0x8528, 0x9509, 0xe5ee, 0xf5cf, 0xc5ac, 0xd58d, 0x3653, 0x2672, 0x1611, 0x0630, 0x76d7, 0x66f6, 0x5695, 0x46b4, 0xb75b, 0xa77a, 0x9719, 0x8738, 0xf7df, 0xe7fe, 0xd79d, 0xc7bc, 0x48c4, 0x58e5, 0x6886, 0x78a7, 0x0840, 0x1861, 0x2802, 0x3823, 0xc9cc, 0xd9ed, 0xe98e, 0xf9af, 0x8948, 0x9969, 0xa90a, 0xb92b, 0x5af5, 0x4ad4, 0x7ab7, 0x6a96, 0x1a71, 0x0a50, 0x3a33, 0x2a12, 0xdbfd, 0xcbdc, 0xfbbf, 0xeb9e, 0x9b79, 0x8b58, 0xbb3b, 0xab1a, 0x6ca6, 0x7c87, 0x4ce4, 0x5cc5, 0x2c22, 0x3c03, 0x0c60, 0x1c41, 0xedae, 0xfd8f, 0xcdec, 0xddcd, 0xad2a, 0xbd0b, 0x8d68, 0x9d49, 0x7e97, 0x6eb6, 0x5ed5, 0x4ef4, 0x3e13, 0x2e32, 0x1e51, 0x0e70, 0xff9f, 0xefbe, 0xdfdd, 0xcffc, 0xbf1b, 0xaf3a, 0x9f59, 0x8f78, 0x9188, 0x81a9, 0xb1ca, 0xa1eb, 0xd10c, 0xc12d, 0xf14e, 0xe16f, 0x1080, 0x00a1, 0x30c2, 0x20e3, 0x5004, 0x4025, 0x7046, 0x6067, 0x83b9, 0x9398, 0xa3fb, 0xb3da, 0xc33d, 0xd31c, 0xe37f, 0xf35e, 0x02b1, 0x1290, 0x22f3, 0x32d2, 0x4235, 0x5214, 0x6277, 0x7256, 0xb5ea, 0xa5cb, 0x95a8, 0x8589, 0xf56e, 0xe54f, 0xd52c, 0xc50d, 0x34e2, 0x24c3, 0x14a0, 0x0481, 0x7466, 0x6447, 0x5424, 0x4405, 0xa7db, 0xb7fa, 0x8799, 0x97b8, 0xe75f, 0xf77e, 0xc71d, 0xd73c, 0x26d3, 0x36f2, 0x0691, 0x16b0, 0x6657, 0x7676, 0x4615, 0x5634, 0xd94c, 0xc96d, 0xf90e, 0xe92f, 0x99c8, 0x89e9, 0xb98a, 0xa9ab, 0x5844, 0x4865, 0x7806, 0x6827, 0x18c0, 0x08e1, 0x3882, 0x28a3, 0xcb7d, 0xdb5c, 0xeb3f, 0xfb1e, 0x8bf9, 0x9bd8, 0xabbb, 0xbb9a, 0x4a75, 0x5a54, 0x6a37, 0x7a16, 0x0af1, 0x1ad0, 0x2ab3, 0x3a92, 0xfd2e, 0xed0f, 0xdd6c, 0xcd4d, 0xbdaa, 0xad8b, 0x9de8, 0x8dc9, 0x7c26, 0x6c07, 0x5c64, 0x4c45, 0x3ca2, 0x2c83, 0x1ce0, 0x0cc1, 0xef1f, 0xff3e, 0xcf5d, 0xdf7c, 0xaf9b, 0xbfba, 0x8fd9, 0x9ff8, 0x6e17, 0x7e36, 0x4e55, 0x5e74, 0x2e93, 0x3eb2, 0x0ed1, 0x1ef0, ] # CRC algorithm: CCITT-0 def calc_crc(self, data, crc=0): for char in bytearray(data): crctbl_idx = ((crc >> 8) ^ char) & 0xff crc = ((crc << 8) ^ self.crctable[crctbl_idx]) & 0xffff return crc & 0xffff if __name__ == '__main__': pass
参考:
https://blog.csdn.net/u012993936/article/details/125102816
https://blog.csdn.net/gitblog_00099/article/details/137135096
https://blog.csdn.net/xuyinxin/article/details/104174587
https://github.com/alexwoo1900/ymodem.git
还没有评论,来说两句吧...