最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

pyserial - Problem with sending data over serial port with Python - Stack Overflow

programmeradmin2浏览0评论

I am currently working on software bootloader implementation and using Python to handle sending data from my PC and receiving acknowledgement signals. Problem that I stumbled upon is that I am getting this runtime error:

Error waiting for acknowledgement: device reports readiness to read but returned no data (device disconnected or multiple access on port?)

Here is my Python code(code for STM32F407 is not relevant):

import time
import tkinter as tk
from tkinter import ttk, filedialog, messagebox
import serial
import serial.tools.list_ports
import os

class FirmwareUpdater:

   

    def __init__(self, root):
       self.root = root
       self.root.title("STM32F407 Firmware Upgrader")
       self.root.geometry("600x400")
       
       self.binary_file = None
       self.serial_port = None
       
       self.create_gui()
   
    def create_gui(self):
       # File Selection Frame
       file_frame = ttk.LabelFrame(self.root, text="Firmware File", padding="10")
       file_frame.pack(fill="x", padx=10, pady=5)
       
       self.file_path_var = tk.StringVar()
       ttk.Label(file_frame, textvariable=self.file_path_var).pack(side="left", fill="x", expand=True)
       ttk.Button(file_frame, text="Browse", command=self.browse_file).pack(side="right")
       
       # Serial Port Frame
       serial_frame = ttk.LabelFrame(self.root, text="Serial Port Settings", padding="10")
       serial_frame.pack(fill="x", padx=10, pady=5)
       
       ttk.Label(serial_frame, text="Port:").grid(row=0, column=0, padx=5)
       self.port_combo = ttk.Combobox(serial_frame, width=20)
       self.port_combo.grid(row=0, column=1, padx=5)

       # Progress Frame
       progress_frame = ttk.LabelFrame(self.root, text="Upload Progress", padding="10")
       progress_frame.pack(fill="x", padx=10, pady=5)
       
       self.progress_bar = ttk.Progressbar(progress_frame, orient=tk.HORIZONTAL, length=300, mode='determinate')
       self.progress_bar.pack(fill="x", pady=5)        
       
       self.status_var = tk.StringVar(value="Ready")
       ttk.Label(progress_frame, textvariable=self.status_var).pack()
       
       # Control Buttons
       button_frame = ttk.Frame(self.root)
       button_frame.pack(fill="x", padx=10, pady=5)
       
       ttk.Button(button_frame, text="Start", command=self.start_upload).pack(side="right", padx=5)
       
       # Initialize ports list
       self.get_ports()
   
    def get_ports(self):
       ports = ["/dev/ttyUSB0","/dev/ttyUSB1","/dev/ttyUSB2"]
       self.port_combo['values'] = ports
       if ports:
           self.port_combo.set(ports[0])

    def browse_file(self):
       filename = filedialog.askopenfilename(
           filetypes=[("Binary files", "*.bin")]
       )
       if filename:
           self.binary_file = filename
           self.file_path_var.set(os.path.basename(filename))
   
    def start_upload(self):
       if not self.binary_file:
           messagebox.showerror("Error", "Please select a firmware file first!")
           return
       
       if not self.port_combo.get():
           messagebox.showerror("Error", "Please select a serial port!")
           return
       
       try:
           print("Starting upload process...")
           # Read binary file
           with open(self.binary_file, 'rb') as f:
               firmware_data = f.read()

           # Just open the port once
           self.serial_port = serial.Serial(
               port=self.port_combo.get(),
               baudrate=115200,  
               timeout=2             
           )

           

            # Start the upload process
           firmware_data = self.add_padding_bytes(firmware_data)
           self.upload_firmware(firmware_data)
           
       except Exception as e:
           messagebox.showerror("Error", f"An error occurred: {str(e)}")


    def get_metadata(self, firmware_data):
       preamble = bytes([0x53,0x54,0x4D,0x33,0x32,0x5F,0x42,0x4F,0x4F,0x54,0x4C,0x4F,0x41,0x44,0x45,0x52,0x5F,0x56,0x30,0x31])
       firmware_size = len(firmware_data).to_bytes(4, 'little')
       checksum = self.calculate_checksum_metadata(preamble + firmware_size)
       return preamble + firmware_size + checksum

    def calculate_checksum_metadata(self, data):
       checksum = 0x00000000
       for i in range(0, len(data), 4):
           word = int.from_bytes(data[i:i+4], byteorder='little')
           checksum ^= word
       return checksum.to_bytes(4, 'little')
    
    def calculate_checksum_firmware(self,data):
        checksum = 0x00000000
        for byte in data:
            checksum ^= byte
        return checksum.to_bytes(4, 'little')
            
    def add_padding_bytes(self, data):
        padding_bytes = (64 - (len(data) % 64)) % 64
        return data + b'\x00' * padding_bytes
   
    def send_hold_signal(self):
       self.serial_port.write(b'\x55\x66\x77\x88')
       print("Hold signal sent")

    def send_metadata(self, firmware_data):
        metadata = self.get_metadata(firmware_data)
        self.serial_port.write(metadata)

    def send_checksum(self,firmware_data):
        checksum = self.calculate_checksum_firmware(firmware_data)
        self.serial_port.write(checksum)

    
    def data_send(self, byte_val):
        self.serial_port.write(bytes([byte_val]))


    def msg_send(self, buf):
        # Calculate checksum
        checksum = self.calculate_checksum_firmware(buf)

        # Send escaped data bytes
        for byte_val in buf:
            self.data_send(byte_val)

        # Send escaped checksum bytes
        for chk_byte in checksum:
            self.data_send(chk_byte)

        
    def wait_for_acknowledgement(self, timeout=2):
        """
        Wait for acknowledgement from bootloader
        
        :param timeout: Timeout in seconds
        :return: True if ACK received, False otherwise
        """
        try:
            # Set a timeout on the serial port
            self.serial_port.timeout = timeout
            
            # Read response
            response = self.serial_port.read(1)
            
            # Check for ACK signal (typically 'A')
            if response == b'A':
                print("Acknowledgement received")
                return True
            else:
                print(f"Unexpected response: {response}")
                return False
        
        except Exception as e:
            print(f"Error waiting for acknowledgement: {e}")
            return False
        finally:
            # Reset timeout to default
            self.serial_port.timeout = None



    def upload_firmware(self, firmware_data):
        """
        Upload firmware using simplified acknowledgement protocol
        """
        # Send hold signal and wait for ACK
        self.send_hold_signal()
        hold_ack = self.wait_for_acknowledgement()
        if not hold_ack:
            raise RuntimeError("Did not receive ACK for hold signal")

        # Send metadata and wait for ACK
        self.send_metadata(firmware_data)
        metadata_ack = self.wait_for_acknowledgement()
        if not metadata_ack:
            raise RuntimeError("Did not receive ACK for metadata")

        chunk_size = 64
        total_chunks = (len(firmware_data) + chunk_size - 1) // chunk_size
        
        # Configure progress bar
        self.progress_bar['maximum'] = total_chunks
        self.progress_bar['value'] = 0
        self.status_var.set("Uploading firmware...")
        
        for chunk_index in range(total_chunks):
            # Prepare chunk
            start = chunk_index * chunk_size
            end = min(start + chunk_size, len(firmware_data))
            chunk = firmware_data[start:end]
            
            # Send chunk
            transmission_successful = False
            max_retries = 3
            retry_count = 0
            
            while not transmission_successful and retry_count < max_retries:
                try:
                    # Send message
                    self.msg_send(chunk)
                    
                    # Wait for chunk acknowledgement
                    chunk_ack = self.wait_for_acknowledgement()
                    
                    if chunk_ack:
                        transmission_successful = True
                        # Update progress bar
                        self.progress_bar['value'] = chunk_index + 1
                        # Update status text
                        self.status_var.set(f"Uploading: {chunk_index + 1}/{total_chunks} chunks")
                        # Force GUI update
                        self.root.update_idletasks()
                        
                        print(f"Chunk {chunk_index + 1}/{total_chunks} sent successfully")
                    else:
                        retry_count += 1
                        print(f"Error receiving ACK for chunk {chunk_index + 1}. Retrying...")
                
                except Exception as e:
                    retry_count += 1
                    print(f"Error sending chunk {chunk_index + 1}: {e}")
            
            # Check if max retries exceeded
            if not transmission_successful:
                self.status_var.set("Upload failed")
                raise RuntimeError(f"Failed to send chunk {chunk_index + 1} after {max_retries} attempts")
        
        # Upload complete
        self.progress_bar['value'] = total_chunks
        self.status_var.set("Firmware upload complete")


if __name__ == "__main__":
   root = tk.Tk()
   app = FirmwareUpdater(root)
   root.mainloop()
发布评论

评论列表(0)

  1. 暂无评论