diff --git a/python-ecosys/debugpy/dap_monitor.py b/python-ecosys/debugpy/dap_monitor.py index b323a61cb..93d02ddf7 100644 --- a/python-ecosys/debugpy/dap_monitor.py +++ b/python-ecosys/debugpy/dap_monitor.py @@ -6,6 +6,7 @@ import json import time import sys +import argparse class DAPMonitor: def __init__(self, listen_port=5679, target_host='127.0.0.1', target_port=5678): @@ -15,35 +16,35 @@ def __init__(self, listen_port=5679, target_host='127.0.0.1', target_port=5678): self.target_port = target_port self.client_sock = None self.server_sock = None - + def start(self): """Start the DAP monitor proxy.""" print(f"DAP Monitor starting on port {self.listen_port}") print(f"Will forward to {self.target_host}:{self.target_port}") print("Start MicroPython debugpy server first, then connect VS Code to port 5679") - + # Create listening socket listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) listener.bind(('127.0.0.1', self.listen_port)) listener.listen(1) - + print(f"Listening for VS Code connection on port {self.listen_port}...") - + try: # Wait for VS Code to connect self.client_sock, client_addr = listener.accept() print(f"VS Code connected from {client_addr}") - + # Connect to MicroPython debugpy server self.server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server_sock.connect((self.target_host, self.target_port)) print(f"Connected to MicroPython debugpy at {self.target_host}:{self.target_port}") - + # Start forwarding threads threading.Thread(target=self.forward_client_to_server, daemon=True).start() threading.Thread(target=self.forward_server_to_client, daemon=True).start() - + print("DAP Monitor active - press Ctrl+C to stop") while not self.disconnect: time.sleep(1) @@ -54,7 +55,7 @@ def start(self): print(f"Error: {e}") finally: self.cleanup() - + def forward_client_to_server(self): """Forward messages from VS Code client to MicroPython server.""" try: @@ -65,7 +66,7 @@ def forward_client_to_server(self): self.send_raw_data(self.server_sock, data) except Exception as e: print(f"Client->Server forwarding error: {e}") - + def forward_server_to_client(self): """Forward messages from MicroPython server to VS Code client.""" try: @@ -76,7 +77,7 @@ def forward_server_to_client(self): self.send_raw_data(self.client_sock, data) except Exception as e: print(f"Server->Client forwarding error: {e}") - + def receive_dap_message(self, sock, source): """Receive and log a DAP message.""" try: @@ -87,7 +88,7 @@ def receive_dap_message(self, sock, source): if not byte: return None header += byte - + # Parse content length header_str = header.decode('utf-8') content_length = 0 @@ -95,10 +96,10 @@ def receive_dap_message(self, sock, source): if line.startswith('Content-Length:'): content_length = int(line.split(':', 1)[1].strip()) break - + if content_length == 0: return None - + # Read content content = b"" while len(content) < content_length: @@ -106,7 +107,7 @@ def receive_dap_message(self, sock, source): if not chunk: return None content += chunk - + # Parse and Log the message message = self.parse_dap(source, content) self.log_dap_message(source, message) @@ -162,7 +163,7 @@ def send_raw_data(self, sock, data): sock.send(data) except Exception as e: print(f"Error sending data: {e}") - + def cleanup(self): """Clean up sockets.""" if self.client_sock: @@ -171,5 +172,16 @@ def cleanup(self): self.server_sock.close() if __name__ == "__main__": - monitor = DAPMonitor() - monitor.start() \ No newline at end of file + + parser = argparse.ArgumentParser(description="DAP protocol monitor proxy") + parser.add_argument("--target-host", "--th", default="127.0.0.1", help="Target debugpy host (default: 127.0.0.1)") + parser.add_argument("--target-port", "--tp", type=int, default=5678, help="Target debugpy port (default: 5678)") + parser.add_argument("--listen-port", "--lp", type=int, default=5679, help="Port to listen for VS Code (default: 5679)") + args = parser.parse_args() + + monitor = DAPMonitor( + listen_port=args.listen_port, + target_host=args.target_host, + target_port=args.target_port + ) + monitor.start() diff --git a/python-ecosys/debugpy/debugpy/__init__.py b/python-ecosys/debugpy/debugpy/__init__.py index b7649bd5c..3912a49a5 100644 --- a/python-ecosys/debugpy/debugpy/__init__.py +++ b/python-ecosys/debugpy/debugpy/__init__.py @@ -11,10 +11,10 @@ from .common.constants import DEFAULT_HOST, DEFAULT_PORT __all__ = [ - "listen", - "wait_for_client", - "breakpoint", - "debug_this_thread", "DEFAULT_HOST", "DEFAULT_PORT", + "breakpoint", + "debug_this_thread", + "listen", + "wait_for_client", ] diff --git a/python-ecosys/debugpy/debugpy/common/messaging.py b/python-ecosys/debugpy/debugpy/common/messaging.py index bc264e3ff..a491578ad 100644 --- a/python-ecosys/debugpy/debugpy/common/messaging.py +++ b/python-ecosys/debugpy/debugpy/common/messaging.py @@ -6,25 +6,25 @@ class JsonMessageChannel: """Handles JSON message communication over a socket using DAP format.""" - + def __init__(self, sock, debug_callback=None): self.sock = sock self.seq = 0 self.closed = False self._recv_buffer = b"" self._debug_print = debug_callback or (lambda x: None) # Default to no-op - + def send_message(self, msg_type, command=None, **kwargs): """Send a DAP message.""" if self.closed: return - + self.seq += 1 message = { "seq": self.seq, "type": msg_type, } - + if command: if msg_type == MSG_TYPE_REQUEST: message["command"] = command @@ -42,20 +42,20 @@ def send_message(self, msg_type, command=None, **kwargs): message["event"] = command if kwargs: message["body"] = kwargs - + json_str = json.dumps(message) content = json_str.encode("utf-8") header = f"Content-Length: {len(content)}\r\n\r\n".encode("utf-8") - + try: self.sock.send(header + content) except OSError: self.closed = True - + def send_request(self, command, **kwargs): """Send a request message.""" self.send_message(MSG_TYPE_REQUEST, command, **kwargs) - + def send_response(self, command, request_seq, success=True, body=None, message=None): """Send a response message.""" kwargs = {"request_seq": request_seq, "success": success} @@ -63,27 +63,29 @@ def send_response(self, command, request_seq, success=True, body=None, message=N kwargs["body"] = body if message is not None: kwargs["message"] = message - - self._debug_print(f"[DAP] SEND: response {command} (req_seq={request_seq}, success={success})") + + self._debug_print( + f"[DAP] SEND: response {command} (req_seq={request_seq}, success={success})" + ) if body: self._debug_print(f"[DAP] body: {body}") if message: self._debug_print(f"[DAP] message: {message}") - + self.send_message(MSG_TYPE_RESPONSE, command, **kwargs) - + def send_event(self, event, **kwargs): """Send an event message.""" self._debug_print(f"[DAP] SEND: event {event}") if kwargs: self._debug_print(f"[DAP] body: {kwargs}") self.send_message(MSG_TYPE_EVENT, event, **kwargs) - + def recv_message(self): """Receive a DAP message.""" if self.closed: return None - + try: # Read headers while b"\r\n\r\n" not in self._recv_buffer: @@ -95,25 +97,25 @@ def recv_message(self): self._recv_buffer += data except OSError as e: # Handle timeout and other socket errors - if hasattr(e, 'errno') and e.errno in (11, 35): # EAGAIN, EWOULDBLOCK + if hasattr(e, "errno") and e.errno in (11, 35): # EAGAIN, EWOULDBLOCK return None # No data available self.closed = True return None - + header_end = self._recv_buffer.find(b"\r\n\r\n") header_str = self._recv_buffer[:header_end].decode("utf-8") - self._recv_buffer = self._recv_buffer[header_end + 4:] - + self._recv_buffer = self._recv_buffer[header_end + 4 :] + # Parse Content-Length content_length = 0 for line in header_str.split("\r\n"): if line.startswith("Content-Length:"): content_length = int(line.split(":", 1)[1].strip()) break - + if content_length == 0: return None - + # Read body while len(self._recv_buffer) < content_length: try: @@ -123,28 +125,30 @@ def recv_message(self): return None self._recv_buffer += data except OSError as e: - if hasattr(e, 'errno') and e.errno in (11, 35): # EAGAIN, EWOULDBLOCK + if hasattr(e, "errno") and e.errno in (11, 35): # EAGAIN, EWOULDBLOCK return None self.closed = True return None - + body = self._recv_buffer[:content_length] self._recv_buffer = self._recv_buffer[content_length:] - + # Parse JSON try: message = json.loads(body.decode("utf-8")) - self._debug_print(f"[DAP] Successfully received message: {message.get('type')} {message.get('command', message.get('event', 'unknown'))}") + self._debug_print( + f"[DAP] Successfully received message: {message.get('type')} {message.get('command', message.get('event', 'unknown'))}" + ) return message except (ValueError, UnicodeDecodeError) as e: print(f"[DAP] JSON parse error: {e}") return None - + except OSError as e: print(f"[DAP] Socket error in recv_message: {e}") self.closed = True return None - + def close(self): """Close the channel.""" self.closed = True diff --git a/python-ecosys/debugpy/debugpy/public_api.py b/python-ecosys/debugpy/debugpy/public_api.py index 137706efe..06b928965 100644 --- a/python-ecosys/debugpy/debugpy/public_api.py +++ b/python-ecosys/debugpy/debugpy/public_api.py @@ -1,6 +1,7 @@ """Public API for debugpy.""" import socket +import struct import sys from .common.constants import DEFAULT_HOST, DEFAULT_PORT from .server.debug_session import DebugSession @@ -10,58 +11,58 @@ def listen(port=DEFAULT_PORT, host=DEFAULT_HOST): """Start listening for debugger connections. - + Args: port: Port number to listen on (default: 5678) host: Host address to bind to (default: "127.0.0.1") - + Returns: (host, port) tuple of the actual listening address """ global _debug_session - + if _debug_session is not None: raise RuntimeError("Already listening for debugger") - + # Create listening socket listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) except: pass # Not supported in MicroPython - + # Use getaddrinfo for MicroPython compatibility addr_info = socket.getaddrinfo(host, port) addr = addr_info[0][-1] # Get the sockaddr listener.bind(addr) listener.listen(1) - + # getsockname not available in MicroPython, use original values print(f"Debugpy listening on {host}:{port}") - + # Wait for connection client_sock = None try: client_sock, client_addr = listener.accept() - print(f"Debugger connected from {client_addr}") - + print(f"Debugger connected from {format_client_addr(client_addr)}") + # Create debug session _debug_session = DebugSession(client_sock) - + # Handle just the initialize request, then return immediately print("[DAP] Waiting for initialize request...") init_message = _debug_session.channel.recv_message() - if init_message and init_message.get('command') == 'initialize': + if init_message and init_message.get("command") == "initialize": _debug_session._handle_message(init_message) print("[DAP] Initialize request handled - returning control immediately") else: print(f"[DAP] Warning: Expected initialize, got {init_message}") - + # Set socket to non-blocking for subsequent message processing _debug_session.channel.sock.settimeout(0.001) - + print("[DAP] Debug session ready - all other messages will be handled in trace function") - + except Exception as e: print(f"[DAP] Connection error: {e}") if client_sock: @@ -70,10 +71,32 @@ def listen(port=DEFAULT_PORT, host=DEFAULT_HOST): finally: # Only close the listener, not the client connection listener.close() - + return (host, port) +def format_client_addr(client_addr): + """Format client address using socket module methods""" + if isinstance(client_addr, (tuple, list)): + # Already in (ip, port) format + return f"{client_addr[0]}:{client_addr[1]}" + elif isinstance(client_addr, bytes) and len(client_addr) >= 8: + # Extract port (bytes 2-4, network byte order) + port = struct.unpack("!H", client_addr[2:4])[0] + # Extract IP address (bytes 4-8) using inet_ntoa + ip_packed = client_addr[4:8] + try: + # inet_ntoa expects 4-byte string in network byte order + ip_addr = socket.inet_ntoa(ip_packed) + return f"{ip_addr}:{port}" + except: + # Fallback if inet_ntoa not available (MicroPython) + ip_addr = ".".join(str(b) for b in ip_packed) + return f"{ip_addr}:{port}" + else: + return str(client_addr) + + def wait_for_client(): """Wait for the debugger client to connect and initialize.""" global _debug_session @@ -88,7 +111,7 @@ def breakpoint(): _debug_session.trigger_breakpoint() else: # Fallback to built-in breakpoint if available - if hasattr(__builtins__, 'breakpoint'): + if hasattr(__builtins__, "breakpoint"): __builtins__.breakpoint() @@ -99,7 +122,7 @@ def debug_this_thread(): _debug_session.debug_this_thread() else: # Install trace function even if no session yet - if hasattr(sys, 'settrace'): + if hasattr(sys, "settrace"): sys.settrace(_default_trace_func) else: raise RuntimeError("MICROPY_PY_SYS_SETTRACE required") @@ -111,7 +134,6 @@ def _default_trace_func(frame, event, arg): return None - def is_client_connected(): """Check if a debugger client is connected.""" global _debug_session diff --git a/python-ecosys/debugpy/debugpy/server/debug_session.py b/python-ecosys/debugpy/debugpy/server/debug_session.py index 3a1a5135d..c7553a604 100644 --- a/python-ecosys/debugpy/debugpy/server/debug_session.py +++ b/python-ecosys/debugpy/debugpy/server/debug_session.py @@ -3,35 +3,61 @@ import sys from ..common.messaging import JsonMessageChannel from ..common.constants import ( - CMD_INITIALIZE, CMD_LAUNCH, CMD_ATTACH, CMD_SET_BREAKPOINTS, - CMD_CONTINUE, CMD_NEXT, CMD_STEP_IN, CMD_STEP_OUT, CMD_PAUSE, - CMD_STACK_TRACE, CMD_SCOPES, CMD_VARIABLES, CMD_EVALUATE, CMD_DISCONNECT, - CMD_CONFIGURATION_DONE, CMD_THREADS, CMD_SOURCE, EVENT_INITIALIZED, EVENT_STOPPED, EVENT_CONTINUED, EVENT_TERMINATED, - STOP_REASON_BREAKPOINT, STOP_REASON_STEP, STOP_REASON_PAUSE, - TRACE_CALL, TRACE_LINE, TRACE_RETURN, TRACE_EXCEPTION + CMD_INITIALIZE, + CMD_LAUNCH, + CMD_ATTACH, + CMD_SET_BREAKPOINTS, + CMD_CONTINUE, + CMD_NEXT, + CMD_STEP_IN, + CMD_STEP_OUT, + CMD_PAUSE, + CMD_STACK_TRACE, + CMD_SCOPES, + CMD_VARIABLES, + CMD_EVALUATE, + CMD_DISCONNECT, + CMD_CONFIGURATION_DONE, + CMD_THREADS, + CMD_SOURCE, + EVENT_INITIALIZED, + EVENT_STOPPED, + EVENT_CONTINUED, + EVENT_TERMINATED, + STOP_REASON_BREAKPOINT, + STOP_REASON_STEP, + STOP_REASON_PAUSE, + TRACE_CALL, + TRACE_LINE, + TRACE_RETURN, + TRACE_EXCEPTION, ) from .pdb_adapter import PdbAdapter class DebugSession: """Manages a debugging session with a DAP client.""" - + def __init__(self, client_socket): self.debug_logging = False # Initialize first self.channel = JsonMessageChannel(client_socket, self._debug_print) self.pdb = PdbAdapter() - self.pdb._debug_session = self # Allow PDB to process messages during wait + self.pdb._debug_session = self # Allow PDB to process messages during wait # type: ignore self.initialized = False self.connected = True self.thread_id = 1 # Simple single-thread model self.stepping = False self.paused = False - + def _debug_print(self, message): """Print debug message only if debug logging is enabled.""" if self.debug_logging: print(message) - + + @property + def _baremetal(self) -> bool: + return sys.platform not in ("linux") # to be expanded + def start(self): """Start the debug session message loop.""" try: @@ -39,51 +65,51 @@ def start(self): message = self.channel.recv_message() if message is None: break - + self._handle_message(message) - + except Exception as e: print(f"Debug session error: {e}") finally: self.disconnect() - + def initialize_connection(self): """Initialize the connection - handle just the essential initial messages then return.""" # Note: debug_logging not available yet during init, so we always show these messages print("[DAP] Processing initial DAP messages...") - + try: # Process initial messages quickly and return control to main thread # We'll handle ongoing messages in the trace function attached = False message_count = 0 max_init_messages = 6 # Just handle the first few essential messages - + while message_count < max_init_messages and not attached: try: # Short timeout - don't block the main thread for long self.channel.sock.settimeout(1.0) message = self.channel.recv_message() if message is None: - print(f"[DAP] No more messages in initial batch") + print("[DAP] No more messages in initial batch") break - + print(f"[DAP] Initial message #{message_count + 1}: {message.get('command')}") self._handle_message(message) message_count += 1 - + # Just wait for attach, then we can return control - if message.get('command') == 'attach': + if message.get("command") == "attach": attached = True print("[DAP] ✅ Attach received - returning control to main thread") break - + except Exception as e: print(f"[DAP] Exception in initial processing: {e}") break finally: self.channel.sock.settimeout(None) - + # After attach, continue processing a few more messages quickly if attached: self._debug_print("[DAP] Processing remaining setup messages...") @@ -101,41 +127,41 @@ def initialize_connection(self): break finally: self.channel.sock.settimeout(None) - - print(f"[DAP] Initial setup complete - main thread can continue") - + + print("[DAP] Initial setup complete - main thread can continue") + except Exception as e: print(f"[DAP] Initialization error: {e}") - + def process_pending_messages(self): """Process any pending DAP messages without blocking.""" try: # Set socket to non-blocking mode for message processing self.channel.sock.settimeout(0.001) # Very short timeout - + while True: message = self.channel.recv_message() if message is None: break self._handle_message(message) - + except Exception: # No messages available or socket error pass finally: # Reset to blocking mode self.channel.sock.settimeout(None) - + def _handle_message(self, message): """Handle incoming DAP messages.""" msg_type = message.get("type") command = message.get("command", message.get("event", "unknown")) seq = message.get("seq", 0) - + self._debug_print(f"[DAP] RECV: {msg_type} {command} (seq={seq})") if message.get("arguments"): self._debug_print(f"[DAP] args: {message['arguments']}") - + if msg_type == "request": self._handle_request(message) elif msg_type == "response": @@ -144,13 +170,13 @@ def _handle_message(self, message): elif msg_type == "event": # We don't expect events from client self._debug_print(f"[DAP] Unexpected event from client: {message}") - + def _handle_request(self, message): """Handle DAP request messages.""" command = message.get("command") seq = message.get("seq", 0) args = message.get("arguments", {}) - + try: if command == CMD_INITIALIZE: self._handle_initialize(seq, args) @@ -187,13 +213,13 @@ def _handle_request(self, message): elif command == CMD_SOURCE: self._handle_source(seq, args) else: - self.channel.send_response(command, seq, success=False, - message=f"Unknown command: {command}") - + self.channel.send_response( + command, seq, success=False, message=f"Unknown command: {command}" + ) + except Exception as e: - self.channel.send_response(command, seq, success=False, - message=str(e)) - + self.channel.send_response(command, seq, success=False, message=str(e)) + def _handle_initialize(self, seq, args): """Handle initialize request.""" capabilities = { @@ -231,87 +257,99 @@ def _handle_initialize(self, seq, args): "supportsBreakpointLocationsRequest": False, "supportsClipboardContext": False, } - + self.channel.send_response(CMD_INITIALIZE, seq, body=capabilities) self.channel.send_event(EVENT_INITIALIZED) self.initialized = True - + def _handle_launch(self, seq, args): """Handle launch request.""" # For attach-mode debugging, we don't need to launch anything self.channel.send_response(CMD_LAUNCH, seq) - + def _handle_attach(self, seq, args): """Handle attach request.""" # Check if debug logging should be enabled self.debug_logging = args.get("logToFile", False) - + self._debug_print(f"[DAP] Processing attach request with args: {args}") - print(f"[DAP] Debug logging {'enabled' if self.debug_logging else 'disabled'} (logToFile={self.debug_logging})") - + print( + f"[DAP] Debug logging {'enabled' if self.debug_logging else 'disabled'} (logToFile={self.debug_logging})" + ) + + # get debugger root and debugee root from pathMappings + for pm in args.get("pathMappings", []): + # debugee - debugger + self.pdb.path_mappings.append((pm.get("remoteRoot", "./"), pm.get("localRoot", "./"))) + # # TODO: justMyCode, debugOptions , + # Enable trace function self.pdb.set_trace_function(self._trace_function) self.channel.send_response(CMD_ATTACH, seq) - + # After successful attach, we might need to send additional events # Some debuggers expect a 'process' event or thread events self._debug_print("[DAP] Attach completed, debugging is now active") - + def _handle_set_breakpoints(self, seq, args): """Handle setBreakpoints request.""" source = args.get("source", {}) filename = source.get("path", "") breakpoints = args.get("breakpoints", []) - + # Debug log the source information self._debug_print(f"[DAP] setBreakpoints source info: {source}") - + # Set breakpoints in pdb adapter actual_breakpoints = self.pdb.set_breakpoints(filename, breakpoints) - - self.channel.send_response(CMD_SET_BREAKPOINTS, seq, - body={"breakpoints": actual_breakpoints}) - + + self.channel.send_response( + CMD_SET_BREAKPOINTS, seq, body={"breakpoints": actual_breakpoints} + ) + def _handle_continue(self, seq, args): """Handle continue request.""" self.stepping = False self.paused = False self.pdb.continue_execution() self.channel.send_response(CMD_CONTINUE, seq) - + def _handle_next(self, seq, args): """Handle next (step over) request.""" self.stepping = True self.paused = False self.pdb.step_over() self.channel.send_response(CMD_NEXT, seq) - + def _handle_step_in(self, seq, args): """Handle stepIn request.""" self.stepping = True self.paused = False self.pdb.step_into() self.channel.send_response(CMD_STEP_IN, seq) - + def _handle_step_out(self, seq, args): """Handle stepOut request.""" self.stepping = True self.paused = False self.pdb.step_out() self.channel.send_response(CMD_STEP_OUT, seq) - + def _handle_pause(self, seq, args): """Handle pause request.""" self.paused = True self.pdb.pause() self.channel.send_response(CMD_PAUSE, seq) - + def _handle_stack_trace(self, seq, args): """Handle stackTrace request.""" stack_frames = self.pdb.get_stack_trace() - self.channel.send_response(CMD_STACK_TRACE, seq, - body={"stackFrames": stack_frames, "totalFrames": len(stack_frames)}) - + self.channel.send_response( + CMD_STACK_TRACE, + seq, + body={"stackFrames": stack_frames, "totalFrames": len(stack_frames)}, + ) + def _handle_scopes(self, seq, args): """Handle scopes request.""" frame_id = args.get("frameId", 0) @@ -319,111 +357,121 @@ def _handle_scopes(self, seq, args): scopes = self.pdb.get_scopes(frame_id) self._debug_print(f"[DAP] Generated scopes: {scopes}") self.channel.send_response(CMD_SCOPES, seq, body={"scopes": scopes}) - + def _handle_variables(self, seq, args): """Handle variables request.""" variables_ref = args.get("variablesReference", 0) variables = self.pdb.get_variables(variables_ref) self.channel.send_response(CMD_VARIABLES, seq, body={"variables": variables}) - + def _handle_evaluate(self, seq, args): """Handle evaluate request.""" expression = args.get("expression", "") frame_id = args.get("frameId") context = args.get("context", "watch") if not expression: - self.channel.send_response(CMD_EVALUATE, seq, success=False, - message="No expression provided") + self.channel.send_response( + CMD_EVALUATE, seq, success=False, message="No expression provided" + ) return try: result = self.pdb.evaluate_expression(expression, frame_id) - self.channel.send_response(CMD_EVALUATE, seq, body={ - "result": str(result), - "variablesReference": 0 - }) + self.channel.send_response( + CMD_EVALUATE, seq, body={"result": str(result), "variablesReference": 0} + ) except Exception as e: - self.channel.send_response(CMD_EVALUATE, seq, success=False, - message=str(e)) - + self.channel.send_response(CMD_EVALUATE, seq, success=False, message=str(e)) + def _handle_disconnect(self, seq, args): """Handle disconnect request.""" self.channel.send_response(CMD_DISCONNECT, seq) self.disconnect() - + def _handle_configuration_done(self, seq, args): """Handle configurationDone request.""" # This indicates that the client has finished configuring breakpoints # and is ready to start debugging self.channel.send_response(CMD_CONFIGURATION_DONE, seq) - + def _handle_threads(self, seq, args): """Handle threads request.""" # MicroPython is single-threaded, so return one thread - threads = [{ - "id": self.thread_id, - "name": "main" - }] + threads = [{"id": self.thread_id, "name": "main"}] self.channel.send_response(CMD_THREADS, seq, body={"threads": threads}) - + def _handle_source(self, seq, args): """Handle source request.""" source = args.get("source", {}) source_path = source.get("path", "") - + if self._baremetal or not source_path: + # BUGBUG: unable to read the source on ESP32 + # Possible an effect of the import / inialization sequence ? + # Nothe that other source files ( other.py) do not seem to get requested in the same way + self.channel.send_response(CMD_SOURCE, seq, success=False) + return + self._debug_print(f"[DAP] Processing source request for path: {source}") try: # Try to read the source file - with open(source_path, 'r') as f: + with open(source_path) as f: content = f.read() self.channel.send_response(CMD_SOURCE, seq, body={"content": content}) - except Exception as e: - self.channel.send_response(CMD_SOURCE, seq, success=False, - message=f"Could not read source: {e}") - + except Exception: + self.channel.send_response( + CMD_SOURCE, + seq, + success=False, + message="cancelled", + # message=f"Could not read source: {e}" + ) + def _trace_function(self, frame, event, arg): """Trace function called by sys.settrace.""" # Process any pending DAP messages frequently self.process_pending_messages() - + # Handle breakpoints and stepping if self.pdb.should_stop(frame, event, arg): - self._send_stopped_event(STOP_REASON_BREAKPOINT if self.pdb.hit_breakpoint else - STOP_REASON_STEP if self.stepping else STOP_REASON_PAUSE) + self._send_stopped_event( + STOP_REASON_BREAKPOINT + if self.pdb.hit_breakpoint + else STOP_REASON_STEP + if self.stepping + else STOP_REASON_PAUSE + ) # Wait for continue command self.pdb.wait_for_continue() - + return self._trace_function - + def _send_stopped_event(self, reason): """Send stopped event to client.""" - self.channel.send_event(EVENT_STOPPED, - reason=reason, - threadId=self.thread_id, - allThreadsStopped=True) - + self.channel.send_event( + EVENT_STOPPED, reason=reason, threadId=self.thread_id, allThreadsStopped=True + ) + def wait_for_client(self): """Wait for client to initialize.""" # This is a simplified version - in a real implementation # we might want to wait for specific initialization steps - pass - + def trigger_breakpoint(self): """Trigger a manual breakpoint.""" if self.initialized: self._send_stopped_event(STOP_REASON_BREAKPOINT) - + def debug_this_thread(self): """Enable debugging for current thread.""" - if hasattr(sys, 'settrace'): + if hasattr(sys, "settrace"): sys.settrace(self._trace_function) - + def is_connected(self): """Check if client is connected.""" return self.connected and not self.channel.closed - + def disconnect(self): """Disconnect from client.""" self.connected = False - if hasattr(sys, 'settrace'): + if hasattr(sys, "settrace"): sys.settrace(None) self.pdb.cleanup() self.channel.close() diff --git a/python-ecosys/debugpy/debugpy/server/pdb_adapter.py b/python-ecosys/debugpy/debugpy/server/pdb_adapter.py index 204862073..11b4dfb81 100644 --- a/python-ecosys/debugpy/debugpy/server/pdb_adapter.py +++ b/python-ecosys/debugpy/debugpy/server/pdb_adapter.py @@ -3,17 +3,93 @@ import sys import time import os +import json + +Any = object from ..common.constants import ( - TRACE_CALL, TRACE_LINE, TRACE_RETURN, TRACE_EXCEPTION, - SCOPE_LOCALS, SCOPE_GLOBALS + TRACE_CALL, + TRACE_LINE, + TRACE_RETURN, + TRACE_EXCEPTION, + SCOPE_LOCALS, + SCOPE_GLOBALS, ) +VARREF_LOCALS = 1 +VARREF_GLOBALS = 2 +VARREF_LOCALS_SPECIAL = 3 +VARREF_GLOBALS_SPECIAL = 4 + +# New constants for complex variable references +VARREF_COMPLEX_BASE = 10000 # Base for complex variable references +MAX_CACHE_SIZE = 50 # Limit cache size for memory constraints + + +class VariableReferenceCache: + """Lightweight cache for complex variable references optimized for MicroPython.""" + + def __init__(self, max_size: int = MAX_CACHE_SIZE): + self.cache: dict[int, Any] = {} + self.insertion_order: list[int] = [] # Track insertion order for proper FIFO + self.next_ref: int = VARREF_COMPLEX_BASE + self.max_size: int = max_size + + def add_variable(self, value: Any) -> int: + """Add a complex variable and return its reference ID.""" + # Clean cache if approaching limit + if len(self.cache) >= self.max_size: + self._cleanup_oldest() + + ref_id = self.next_ref + self.cache[ref_id] = value + self.insertion_order.append(ref_id) + self.next_ref += 1 + return ref_id + + def get_variable(self, ref_id: int): # -> Optional[Any] + """Get variable by reference ID.""" + return self.cache.get(ref_id) + + def _cleanup_oldest(self) -> None: + """Remove oldest entries to free memory.""" + if self.cache and self.insertion_order: + # Remove first quarter of entries (true FIFO based on insertion order) + to_remove = max(1, len(self.cache) // 4) # Remove at least 1 entry + keys_to_remove = self.insertion_order[:to_remove] + for key in keys_to_remove: + if key in self.cache: + del self.cache[key] + # Update insertion order + self.insertion_order = self.insertion_order[to_remove:] + + def clear(self) -> None: + """Clear all cached variables.""" + self.cache.clear() + self.insertion_order.clear() + + +# Also try checking by basename for path mismatches +def basename(path: str): + return path.split("/")[-1] if "/" in path else path + + +# Check if this might be a relative path match +def ends_with_path(full_path: str, relative_path: str): + """Check if full_path ends with relative_path components.""" + full_parts = full_path.replace("\\", "/").split("/") + rel_parts = relative_path.replace("\\", "/").split("/") + if len(rel_parts) > len(full_parts): + return False + return full_parts[-len(rel_parts) :] == rel_parts + class PdbAdapter: """Adapter between DAP protocol and MicroPython's sys.settrace functionality.""" - + def __init__(self): - self.breakpoints = {} # filename -> {line_no: breakpoint_info} + self.breakpoints: dict[ + str, dict[int, dict] + ] = {} # filename -> {line_no: breakpoint_info} # todo - simplify - reduce info stored self.current_frame = None self.step_mode = None # None, 'over', 'into', 'out' self.step_frame = None @@ -21,313 +97,511 @@ def __init__(self): self.hit_breakpoint = False self.continue_event = False self.variables_cache = {} # frameId -> variables + self.var_cache = VariableReferenceCache() # Enhanced variable reference cache self.frame_id_counter = 1 - self.path_mapping = {} # runtime_path -> vscode_path mapping - + self.path_mappings: list[ + tuple[str, str] + ] = [] # runtime_path -> vscode_path mapping # todo: move to session level + self.file_mappings: dict[ + str, str + ] = {} # runtime_path -> vscode_path mapping # todo : merge with .breakpoints + def _debug_print(self, message): """Print debug message only if debug logging is enabled.""" - if hasattr(self, '_debug_session') and self._debug_session.debug_logging: + if hasattr(self, "_debug_session") and self._debug_session.debug_logging: # type: ignore print(message) - - def _normalize_path(self, path): + + def _normalize_path(self, path: str): """Normalize a file path for consistent comparisons.""" # Convert to absolute path if possible try: - if hasattr(os.path, 'abspath'): + if hasattr(os.path, "abspath"): path = os.path.abspath(path) - elif hasattr(os.path, 'realpath'): + elif hasattr(os.path, "realpath"): path = os.path.realpath(path) except: pass - # Ensure consistent separators - path = path.replace('\\', '/') + path = path.replace("\\", "/") return path - + def set_trace_function(self, trace_func): """Install the trace function.""" - if hasattr(sys, 'settrace'): + if hasattr(sys, "settrace"): sys.settrace(trace_func) else: raise RuntimeError("sys.settrace not available") - - def set_breakpoints(self, filename, breakpoints): + + def _filename_as_debugee(self, path: str): + # check if we have a 1:1 file mapping for this path + if self.file_mappings.get(path): + return self.file_mappings[path] + # Check if we have a folder mapping for this path + for runtime_path, vscode_path in self.path_mappings: + if path.startswith(vscode_path): + path = path.replace(vscode_path, runtime_path, 1) + if path.startswith("//"): + path = path[1:] + # If no mapping found, return the original path + return path + + def _filename_as_debugger(self, path: str): + """Convert a file path to the debugger's expected format.""" + path = path or "" + if not path: + return path + if path.startswith("<"): + # Special case for or similar + return path + # Check if we have a 1:1 file mapping for this path + for runtime_path, vscode_path in self.path_mappings: + if path.startswith(runtime_path): + path = path.replace(runtime_path, vscode_path, 1) + return path + + # Check if we have a folder mapping for this path + for runtime_path, vscode_path in self.path_mappings: + if path.startswith(runtime_path): + path = path.replace(runtime_path, vscode_path, 1) + if path.startswith("//"): + path = path[1:] + # If no mapping found, return the original path + return path + + def set_breakpoints(self, filename: str, breakpoints: list[dict]): """Set breakpoints for a file.""" self.breakpoints[filename] = {} + local_name = self._filename_as_debugee(filename) + self.file_mappings[local_name] = filename actual_breakpoints = [] - - # Debug log the breakpoint path self._debug_print(f"[PDB] Setting breakpoints for file: {filename}") - + for bp in breakpoints: line = bp.get("line") if line: + if local_name != filename: + self.breakpoints[local_name] = {} + self._debug_print(f"[>>>] Setting breakpoints for local: {local_name}:{line}") + self.breakpoints[local_name][line] = { + "line": line, + "verified": True, + "source": {"path": filename}, + } self.breakpoints[filename][line] = { "line": line, "verified": True, - "source": {"path": filename} + "source": {"path": filename}, } - actual_breakpoints.append({ - "line": line, - "verified": True, - "source": {"path": filename} - }) - + actual_breakpoints.append( + {"line": line, "verified": True, "source": {"path": filename}} + ) + + self._debug_print(f"[PDB] Breakpoints set : {self.breakpoints}") + return actual_breakpoints - - def should_stop(self, frame, event, arg): + + def should_stop(self, frame, event: str, arg): """Determine if execution should stop at this point.""" self.current_frame = frame self.hit_breakpoint = False - + # Get frame information filename = frame.f_code.co_filename lineno = frame.f_lineno - - # Debug: print filename and line for debugging - if event == TRACE_LINE and lineno in [20, 21, 22, 23, 24]: # Only log lines near our breakpoints - self._debug_print(f"[PDB] Checking {filename}:{lineno} (event={event})") - self._debug_print(f"[PDB] Available breakpoint files: {list(self.breakpoints.keys())}") - # Check for exact filename match first if filename in self.breakpoints: if lineno in self.breakpoints[filename]: self._debug_print(f"[PDB] HIT BREAKPOINT (exact match) at {filename}:{lineno}") # Record the path mapping (in this case, they're already the same) - self.path_mapping[filename] = filename + self.file_mappings[filename] = self._filename_as_debugger(filename) self.hit_breakpoint = True return True - - # Also try checking by basename for path mismatches - def basename(path): - return path.split('/')[-1] if '/' in path else path - - # Check if this might be a relative path match - def ends_with_path(full_path, relative_path): - """Check if full_path ends with relative_path components.""" - full_parts = full_path.replace('\\', '/').split('/') - rel_parts = relative_path.replace('\\', '/').split('/') - if len(rel_parts) > len(full_parts): - return False - return full_parts[-len(rel_parts):] == rel_parts - - file_basename = basename(filename) - self._debug_print(f"[PDB] Fallback basename match: '{file_basename}' vs available files") - for bp_file in self.breakpoints: - bp_basename = basename(bp_file) - self._debug_print(f"[PDB] Comparing '{file_basename}' == '{bp_basename}' ?") - if bp_basename == file_basename: - self._debug_print(f"[PDB] Basename match found! Checking line {lineno} in {list(self.breakpoints[bp_file].keys())}") - if lineno in self.breakpoints[bp_file]: - self._debug_print(f"[PDB] HIT BREAKPOINT (fallback basename match) at {filename}:{lineno} -> {bp_file}") - # Record the path mapping so we can report the correct path in stack traces - self.path_mapping[filename] = bp_file - self.hit_breakpoint = True - return True - - # Also check if the runtime path might be relative and the breakpoint path absolute - if ends_with_path(bp_file, filename): - self._debug_print(f"[PDB] Relative path match: {bp_file} ends with {filename}") - if lineno in self.breakpoints[bp_file]: - self._debug_print(f"[PDB] HIT BREAKPOINT (relative path match) at {filename}:{lineno} -> {bp_file}") - # Record the path mapping so we can report the correct path in stack traces - self.path_mapping[filename] = bp_file - self.hit_breakpoint = True - return True - + # path/file.py matched - but not the line number - keep running + else: + # file not (yet) matched - this is slow so we do not want to do this often. + # TODO: use builins - sys.path method to find the file + # if we have a path match , but no breakpoints - add it to the file_mappings dict avoid this check + self.breakpoints[filename] = {} # Ensure the filename is in the breakpoints dict + if not filename in self.file_mappings: + self.file_mappings[filename] = self._filename_as_debugger(filename) + self._debug_print( + f"[PDB] add mapping for :'{filename}' -> '{self.file_mappings[filename]}'" + ) + # Check stepping - if self.step_mode == 'into': + if self.step_mode == "into": if event in (TRACE_CALL, TRACE_LINE): self.step_mode = None return True - - elif self.step_mode == 'over': + + elif self.step_mode == "over": if event == TRACE_LINE and frame == self.step_frame: self.step_mode = None return True elif event == TRACE_RETURN and frame == self.step_frame: # Continue stepping in caller - if hasattr(frame, 'f_back') and frame.f_back: + if hasattr(frame, "f_back") and frame.f_back: self.step_frame = frame.f_back else: self.step_mode = None - - elif self.step_mode == 'out': + + elif self.step_mode == "out": if event == TRACE_RETURN and frame == self.step_frame: self.step_mode = None return True - + return False - + def continue_execution(self): """Continue execution.""" self.step_mode = None self.continue_event = True - + def step_over(self): """Step over (next line).""" - self.step_mode = 'over' + self.step_mode = "over" self.step_frame = self.current_frame self.continue_event = True - + def step_into(self): """Step into function calls.""" - self.step_mode = 'into' + self.step_mode = "into" self.continue_event = True - + def step_out(self): """Step out of current function.""" - self.step_mode = 'out' + self.step_mode = "out" self.step_frame = self.current_frame self.continue_event = True - + def pause(self): """Pause execution at next opportunity.""" # This is handled by the debug session - pass - + def wait_for_continue(self): """Wait for continue command (simplified implementation).""" # In a real implementation, this would block until continue # For MicroPython, we'll use a simple polling approach self.continue_event = False - + # Process DAP messages while waiting for continue self._debug_print("[PDB] Waiting for continue command...") while not self.continue_event: # Process any pending DAP messages (scopes, variables, etc.) - if hasattr(self, '_debug_session'): - self._debug_session.process_pending_messages() + if hasattr(self, "_debug_session"): + self._debug_session.process_pending_messages() # type: ignore time.sleep(0.01) - + def get_stack_trace(self): """Get the current stack trace.""" if not self.current_frame: return [] - + frames = [] frame = self.current_frame frame_id = 0 - + while frame: filename = frame.f_code.co_filename name = frame.f_code.co_name line = frame.f_lineno - + if "" in filename or filename.endswith("debugpy.py"): + hint = "subtle" + else: + hint = "normal" + + # self._debug_print("=" * 40 ) + # self._debug_print(f"[PDB] file mappings: {repr(self.file_mappings)} " ) + # self._debug_print(f"[PDB] path mappings: {repr(self.path_mappings)}" ) + # self._debug_print("=" * 40 ) + # Use the VS Code path if we have a mapping, otherwise use the original path - display_path = self.path_mapping.get(filename, filename) - if filename != display_path: - self._debug_print(f"[PDB] Stack trace path mapping: {filename} -> {display_path}") - - # Create frame info - frames.append({ - "id": frame_id, - "name": name, - "source": {"path": display_path}, - "line": line, - "column": 1, - "endLine": line, - "endColumn": 1 - }) - + debugger_path = self._filename_as_debugger(filename) + if filename != debugger_path: + self._debug_print(f"[PDB] Stack trace path mapping: {filename} -> {debugger_path}") + # Create StackFrame info + frames.append( + { + "id": frame_id, + "name": name, + "source": {"path": debugger_path}, + "line": line, + "column": 1, + "endLine": line, + "endColumn": 1, + "presentationHint": hint, + } + ) + # Cache frame for variable access self.variables_cache[frame_id] = frame - + # MicroPython doesn't have f_back attribute - if hasattr(frame, 'f_back'): + if hasattr(frame, "f_back"): frame = frame.f_back else: # Only return the current frame for MicroPython break frame_id += 1 - + return frames - + def get_scopes(self, frame_id): """Get variable scopes for a frame.""" scopes = [ { - "name": "Locals", - "variablesReference": frame_id * 1000 + 1, - "expensive": False + "name": SCOPE_LOCALS, + "variablesReference": frame_id * 1000 + VARREF_LOCALS, + "expensive": False, }, { - "name": "Globals", - "variablesReference": frame_id * 1000 + 2, - "expensive": False - } + "name": SCOPE_GLOBALS, + "variablesReference": frame_id * 1000 + VARREF_GLOBALS, + "expensive": False, + }, ] return scopes - + + def _process_special_variables(self, var_dict): + """Process special variables (those starting and ending with __).""" + variables = [] + for name, value in var_dict.items(): + if name.startswith("__") and name.endswith("__"): + try: + value_str = json.dumps(value) + type_str = type(value).__name__ + variables.append( + { + "name": name, + "value": value_str, + "type": type_str, + "variablesReference": 0, + } + ) + except Exception: + variables.append(self._var_error(name)) + return variables + + def _process_regular_variables(self, var_dict): + """Process regular variables (excluding special ones).""" + variables = [] + for name, value in var_dict.items(): + # Skip private/internal variables + if name.startswith("__") and name.endswith("__"): + continue + variables.append(self._get_variable_info(name, value)) + return variables + + def _is_expandable(self, value: Any) -> bool: + """Check if a variable can be expanded (has child elements).""" + return isinstance(value, (dict, list, tuple, set)) + + def _get_preview(self, value: Any, fallback_text: str = "") -> str: + """Get a truncated preview of a variable value.""" + try: + if value is None: + return "None" + + # Try to get a meaningful representation + preview_repr = repr(value) + if len(preview_repr) > 30: + return preview_repr[:30] + "..." + else: + return preview_repr + except (TypeError, ValueError): + # If repr fails, try str + try: + preview_str = str(value) + if len(preview_str) > 30: + return preview_str[:30] + "..." + else: + return preview_str + except: + # Final fallback + return fallback_text or f"<{type(value).__name__} object>" + + def _get_variable_info(self, name: str, value: Any) -> dict[str, str | int]: + """Get DAP-compliant variable information with proper type handling.""" + try: + # Handle expandable types + if self._is_expandable(value): + var_ref = self.var_cache.add_variable(value) + + if isinstance(value, dict): + preview = ( + self._get_preview(value, f"dict({len(value)} items)") + if value + else "dict(empty)" + ) + return { + "name": name, + "value": preview, + "type": "dict", + "variablesReference": var_ref, + "namedVariables": len(value), + "indexedVariables": 0, + } + elif isinstance(value, list): + preview = ( + self._get_preview(value, f"list({len(value)} items)") + if value + else "list(empty)" + ) + return { + "name": name, + "value": preview, + "type": "list", + "variablesReference": var_ref, + "indexedVariables": len(value), + "namedVariables": 0, + } + elif isinstance(value, tuple): + preview = ( + self._get_preview(value, f"tuple({len(value)} items)") + if value + else "tuple(empty)" + ) + return { + "name": name, + "value": preview, + "type": "tuple", + "variablesReference": var_ref, + "indexedVariables": len(value), + "namedVariables": 0, + } + elif isinstance(value, set): + preview = ( + self._get_preview(value, f"set({len(value)} items)") + if value + else "set(empty)" + ) + return { + "name": name, + "value": preview, + "type": "set", + "variablesReference": var_ref, + "indexedVariables": len(value), + "namedVariables": 0, + } + + # Simple types - use the preview helper + preview = self._get_preview(value) + + return { + "name": name, + "value": preview, + "type": type(value).__name__, + "variablesReference": 0, + } + except Exception: + return self._var_error(name) + + def _expand_complex_variable(self, ref_id: int) -> list[dict[str, str | int]]: + """Expand a complex variable into its child elements.""" + value = self.var_cache.get_variable(ref_id) + if value is None: + return [] + + variables = [] + try: + if isinstance(value, dict): + # Handle dictionary keys and values + for key, val in value.items(): + key_str = str(key) + variables.append(self._get_variable_info(key_str, val)) + elif isinstance(value, (list, tuple)): + # Handle list/tuple elements + for i, val in enumerate(value): + variables.append(self._get_variable_info(f"[{i}]", val)) + elif isinstance(value, set): + # Handle set elements (sorted for consistent display) + for i, val in enumerate(sorted(value, key=lambda x: str(x))): + variables.append(self._get_variable_info(f"<{i}>", val)) + except Exception as e: + # Return error info for debugging + variables.append( + { + "name": "error", + "value": f"Failed to expand: {e}", + "type": "error", + "variablesReference": 0, + } + ) + + return variables + + @staticmethod + def _var_error(name: str): + return {"name": name, "value": "", "type": "unknown", "variablesReference": 0} + + @staticmethod + def _special_vars(varref: int): + return {"name": "Special", "value": "", "variablesReference": varref} + def get_variables(self, variables_ref): - """Get variables for a scope.""" + """Get variables for a scope with enhanced complex variable support.""" + # Handle complex variable expansion + if variables_ref >= VARREF_COMPLEX_BASE: + return self._expand_complex_variable(variables_ref) + frame_id = variables_ref // 1000 scope_type = variables_ref % 1000 - + if frame_id not in self.variables_cache: return [] - + frame = self.variables_cache[frame_id] + + # Handle special scope types first + if scope_type == VARREF_LOCALS_SPECIAL: + var_dict = frame.f_locals if hasattr(frame, "f_locals") else {} + return self._process_special_variables(var_dict) + elif scope_type == VARREF_GLOBALS_SPECIAL: + var_dict = frame.f_globals if hasattr(frame, "f_globals") else {} + return self._process_special_variables(var_dict) + + # Handle regular scope types with special folder variables = [] - - if scope_type == 1: # Locals - var_dict = frame.f_locals if hasattr(frame, 'f_locals') else {} - elif scope_type == 2: # Globals - var_dict = frame.f_globals if hasattr(frame, 'f_globals') else {} + if scope_type == VARREF_LOCALS: + var_dict = frame.f_locals if hasattr(frame, "f_locals") else {} + variables.append(self._special_vars(frame_id * 1000 + VARREF_LOCALS_SPECIAL)) + elif scope_type == VARREF_GLOBALS: + var_dict = frame.f_globals if hasattr(frame, "f_globals") else {} + variables.append(self._special_vars(frame_id * 1000 + VARREF_GLOBALS_SPECIAL)) else: + # Invalid reference, return empty return [] - - for name, value in var_dict.items(): - # Skip private/internal variables - if name.startswith('__') and name.endswith('__'): - continue - - try: - value_str = repr(value) - type_str = type(value).__name__ - - variables.append({ - "name": name, - "value": value_str, - "type": type_str, - "variablesReference": 0 # Simple implementation - no nested objects - }) - except Exception: - variables.append({ - "name": name, - "value": "", - "type": "unknown", - "variablesReference": 0 - }) - + + # Add regular variables with enhanced processing + variables.extend(self._process_regular_variables(var_dict)) return variables - + def evaluate_expression(self, expression, frame_id=None): """Evaluate an expression in the context of a frame.""" if frame_id is not None and frame_id in self.variables_cache: frame = self.variables_cache[frame_id] - globals_dict = frame.f_globals if hasattr(frame, 'f_globals') else {} - locals_dict = frame.f_locals if hasattr(frame, 'f_locals') else {} + globals_dict = frame.f_globals if hasattr(frame, "f_globals") else {} + locals_dict = frame.f_locals if hasattr(frame, "f_locals") else {} else: # Use current frame frame = self.current_frame if frame: - globals_dict = frame.f_globals if hasattr(frame, 'f_globals') else {} - locals_dict = frame.f_locals if hasattr(frame, 'f_locals') else {} + globals_dict = frame.f_globals if hasattr(frame, "f_globals") else {} + locals_dict = frame.f_locals if hasattr(frame, "f_locals") else {} else: globals_dict = globals() locals_dict = {} - try: # Evaluate the expression result = eval(expression, globals_dict, locals_dict) return result except Exception as e: raise Exception(f"Evaluation error: {e}") - + def cleanup(self): - """Clean up resources.""" + """Clean up resources with enhanced cache management.""" self.variables_cache.clear() + self.var_cache.clear() # Clear variable reference cache self.breakpoints.clear() - if hasattr(sys, 'settrace'): + if hasattr(sys, "settrace"): sys.settrace(None) diff --git a/python-ecosys/debugpy/demo.py b/python-ecosys/debugpy/demo.py index d5b3d0923..02a927257 100644 --- a/python-ecosys/debugpy/demo.py +++ b/python-ecosys/debugpy/demo.py @@ -16,10 +16,10 @@ def main(): print("MicroPython debugpy Demo") print("========================") print() - + # Demonstrate trace functionality print("1. Testing trace functionality:") - + def trace_function(frame, event, arg): if event == 'call': print(f" -> Entering function: {frame.f_code.co_name}") @@ -28,34 +28,34 @@ def trace_function(frame, event, arg): elif event == 'return': print(f" -> Returning from {frame.f_code.co_name} with value: {arg}") return trace_function - + # Enable tracing sys.settrace(trace_function) - + # Execute traced function result = simple_function(5, 3) - + # Disable tracing sys.settrace(None) - + print(f"Result: {result}") print() - + # Demonstrate debugpy components print("2. Testing debugpy components:") - + # Test PDB adapter from debugpy.server.pdb_adapter import PdbAdapter pdb = PdbAdapter() - + # Set some mock breakpoints breakpoints = pdb.set_breakpoints("demo.py", [{"line": 10}, {"line": 15}]) print(f" Set breakpoints: {len(breakpoints)} breakpoints") - + # Test messaging from debugpy.common.messaging import JsonMessageChannel print(" JsonMessageChannel available") - + print() print("3. debugpy is ready for VS Code integration!") print(" To use with VS Code:") @@ -63,6 +63,6 @@ def trace_function(frame, event, arg): print(" - Call debugpy.listen() to start the debug server") print(" - Connect VS Code using the 'Attach to MicroPython' configuration") print(" - Set breakpoints and debug normally") - + if __name__ == "__main__": main() diff --git a/python-ecosys/debugpy/test_vscode.py b/python-ecosys/debugpy/test_vscode.py index 2dca82d34..9a5672822 100644 --- a/python-ecosys/debugpy/test_vscode.py +++ b/python-ecosys/debugpy/test_vscode.py @@ -23,7 +23,7 @@ def debuggable_code(): """The actual code we want to debug - wrapped in a function so sys.settrace will trace it.""" global foo print("Starting debuggable code...") - + # Test data - set breakpoint here (using smaller numbers to avoid slow fibonacci) numbers = [3, 4, 5] for i, num in enumerate(numbers): @@ -34,18 +34,18 @@ def debuggable_code(): print(sys.implementation) import machine print(dir(machine)) - + # Test manual breakpoint print("\nTriggering manual breakpoint...") debugpy.breakpoint() print("Manual breakpoint triggered!") - + print("Test completed successfully!") def main(): print("MicroPython VS Code Debugging Test") print("==================================") - + # Start debug server try: debugpy.listen() @@ -57,22 +57,22 @@ def main(): # input() # except: # pass - + # Enable debugging for this thread debugpy.debug_this_thread() - + # Give VS Code a moment to set breakpoints after attach print("\nGiving VS Code time to set breakpoints...") import time time.sleep(2) - + # Call the debuggable code function so it gets traced debuggable_code() - + except KeyboardInterrupt: print("\nTest interrupted by user") except Exception as e: print(f"Error: {e}") if __name__ == "__main__": - main() \ No newline at end of file + main()