diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml new file mode 100644 index 00000000..56d18a9a --- /dev/null +++ b/.github/workflows/release.yaml @@ -0,0 +1,133 @@ +name: release + +on: + push: + tags: + - "[0-9]+.[0-9]+.[0-9]+" + - "[0-9]+.[0-9]+.[0-9]+a[0-9]+" + - "[0-9]+.[0-9]+.[0-9]+b[0-9]+" + - "[0-9]+.[0-9]+.[0-9]+rc[0-9]+" + +env: + PACKAGE_NAME: "chordspy" + OWNER: "Upside Down Labs" + +jobs: + details: + runs-on: ubuntu-latest + outputs: + new_version: ${{ steps.release.outputs.new_version }} + suffix: ${{ steps.release.outputs.suffix }} + tag_name: ${{ steps.release.outputs.tag_name }} + steps: + - uses: actions/checkout@v4 + + - name: Extract tag and Details + id: release + run: | + if [ "${{ github.ref_type }}" = "tag" ]; then + TAG_NAME=${GITHUB_REF#refs/tags/} + NEW_VERSION=$(echo $TAG_NAME | awk -F'-' '{print $1}') + SUFFIX=$(echo $TAG_NAME | grep -oP '[a-z]+[0-9]+' || echo "") + echo "new_version=$NEW_VERSION" >> "$GITHUB_OUTPUT" + echo "suffix=$SUFFIX" >> "$GITHUB_OUTPUT" + echo "tag_name=$TAG_NAME" >> "$GITHUB_OUTPUT" + else + echo "No tag found" + exit 1 + fi + + check_pypi: + needs: details + runs-on: ubuntu-latest + steps: + - name: Fetch PyPI version + run: | + response=$(curl -s https://pypi.org/pypi/${{ env.PACKAGE_NAME }}/json || echo "{}") + latest_version=$(echo $response | jq -r '.info.version // "0.0.0"') + echo "latest_version=$latest_version" >> $GITHUB_ENV + + - name: Compare versions + run: | + if [ "$(printf '%s\n' "$latest_version" "${{ needs.details.outputs.new_version }}" | sort -rV | head -n1)" != "${{ needs.details.outputs.new_version }}" ]; then + echo "Version ${{ needs.details.outputs.new_version }} is not newer than PyPI version $latest_version" + exit 1 + fi + + setup_and_build: + needs: [details, check_pypi] + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: "3.10" # Changed from 3.13 to stable version + + - name: Install Poetry + uses: snok/install-poetry@v1 + with: + virtualenvs-create: true + virtualenvs-in-project: true + + - name: Configure Poetry + run: | + poetry config virtualenvs.in-project true + poetry config virtualenvs.create true + + - name: Set version + run: poetry version ${{ needs.details.outputs.new_version }} + + - name: Install dependencies + run: poetry install --sync --no-interaction --no-root + + - name: Build package + run: poetry build + + - name: Upload artifacts + uses: actions/upload-artifact@v4 + with: + name: dist + path: dist/* + + pypi_publish: + needs: setup_and_build + runs-on: ubuntu-latest + environment: + name: release + permissions: + id-token: write # Essential for trusted publishing + steps: + - uses: actions/download-artifact@v4 + with: + name: dist + path: dist/ + + - name: Publish to PyPI + uses: pypa/gh-action-pypi-publish@release/v1 + with: + verbose: true # For better debugging + + github_release: + needs: [details, setup_and_build, pypi_publish] + runs-on: ubuntu-latest + permissions: + contents: write + steps: + - uses: actions/checkout@v4 + with: + fetch-depth: 0 + + - uses: actions/download-artifact@v4 + with: + name: dist + path: dist/ + + - name: Create Release + uses: softprops/action-gh-release@v2 + with: + tag_name: ${{ needs.details.outputs.tag_name }} + files: | + dist/* + generate_release_notes: true \ No newline at end of file diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 00000000..c9e3e5ab --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1,5 @@ +include requirements.txt +include README.md +recursive-include chordspy/templates * +recursive-include chordspy/static * +recursive-include chordspy/config * \ No newline at end of file diff --git a/README.md b/README.md index a149400f..1d96121e 100644 --- a/README.md +++ b/README.md @@ -1,46 +1,46 @@ # Chords - Python -Chords- Python is a bag of tools designed to interface with Micro-controller development boards running [Chords Arduino Firmware](https://github.com/upsidedownlabs/Chords-Arduino-Firmware).Use Upside Down Labs bio-potential amplifiers to read data, visualize it, record data in CSV Files, and stream it via Lab Streaming Layer. +Chords- Python is an open-source bag of tools designed to interface with Micro-controller development boards running [Chords Arduino Firmware](https://github.com/upsidedownlabs/Chords-Arduino-Firmware). Use Upside Down Labs bio-potential amplifiers to read data, visualize it, record data in CSV Files, and stream it via Lab Streaming Layer. > [!NOTE] -> **Firmware Required:** -> - For Arduino: [Chords Arduino Firmware](https://github.com/upsidedownlabs/Chords-Arduino-Firmware) +> **Firmware Required for Arduino:** [Chords Arduino Firmware](https://github.com/upsidedownlabs/Chords-Arduino-Firmware) ## Features - **Multiple Protocols**: Supports `Wi-Fi`, `Bluetooth`, and `Serial` communication. -- **LSL Data Streaming**:Once the LSL stream starts, any PC on the same Wi-Fi network can access the data using tools like BrainVision LSL Viewer. +- **LSL Data Streaming**: Once the LSL stream starts, any PC on the same Wi-Fi network can access the data using tools like BrainVision LSL Viewer. - **CSV Logging**: Save raw data with Counter - **GUI**: Live plotting for all channels. -- **Applications**: EEG/ECG/EMG/EOG-based games and utilities (e.g., Tug of War, Keystroke Emulator). - -## Installation -1. **Python**: Ensure Latest version of Python is installed. -2. **Virtual Environment**: - ```bash - python -m venv venv - source venv/bin/activate # Linux/macOS - .\venv\Scripts\activate # Windows - ``` -3. **Dependencies**: - ```bash - pip install -r requirements.txt - ``` - -> [!IMPORTANT] -> On Windows, if scripts are blocked, run: -> ```powershell -> Set-ExecutionPolicy Unrestricted -Scope Process -> ``` +- **Applications**: EEG/ECG/EMG/EOG-based games and utilities (e.g., Tug of War, Keystroke Emulator). + + +## Installation + +- Make sure you have the latest version of Python installed. + +- Open command prompt and run: +```bash +python -m venv venv +``` + +```bash +venv\Scripts\activate # For Windows +source venv/bin/activate # For MacOS/Linux +``` + +```bash +pip install chordspy +``` ## Usage -Run the script and access the web interface: +Run the command and access the web interface: ```bash -python app.py +chordspy ``` + **Web Interface Preview**: -![Web Interface Screenshot](./media/Interface.png) +![Web Interface Screenshot](./chordspy/media/Interface.png) -![Web Interface Screenshot](./media/Webinterface.png) +![Web Interface Screenshot](./chordspy/media/Webinterface.png) ### Key Options: diff --git a/app.py b/app.py deleted file mode 100644 index 29be055b..00000000 --- a/app.py +++ /dev/null @@ -1,244 +0,0 @@ -from flask import Flask, render_template, request, jsonify -from connection import Connection -import threading -import asyncio -import logging -from bleak import BleakScanner -from flask import Response -import queue -import yaml -from pathlib import Path -import os - -console_queue = queue.Queue() -app = Flask(__name__) -logging.basicConfig(level=logging.INFO) - -# Global variables -connection_manager = None -connection_thread = None -ble_devices = [] -stream_active = False -running_apps = {} # Dictionary to track running apps - -@app.route('/log_error', methods=['POST']) -def log_error(): - try: - error_data = request.get_json() - if not error_data or 'error' not in error_data or 'log_error' in str(error_data): - return jsonify({'status': 'error', 'message': 'Invalid data'}), 400 - - os.makedirs('logs', exist_ok=True) - - with open('logs/logging.txt', 'a') as f: - f.write(error_data['error']) - - return jsonify({'status': 'success'}) - except Exception as e: - return jsonify({'status': 'error', 'message': 'Logging failed'}), 500 - -def run_async(coro): - def wrapper(*args, **kwargs): - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - try: - return loop.run_until_complete(coro(*args, **kwargs)) - finally: - loop.close() - return wrapper - -@app.route('/') -def index(): - return render_template('index.html') - -@app.route('/get_apps_config') -def get_apps_config(): - try: - config_path = Path('config') / 'apps.yaml' - if config_path.exists(): - with open(config_path, 'r') as file: - config = yaml.safe_load(file) - return jsonify(config) - return jsonify - - except Exception as e: - logging.error(f"Error loading apps config: {str(e)}") - return jsonify - -@app.route('/scan_ble') -@run_async -async def scan_ble_devices(): - global ble_devices - try: - devices = await BleakScanner.discover(timeout=5) - ble_devices = [{'name': d.name or 'Unknown', 'address': d.address} - for d in devices if d.name and d.name.startswith(('NPG', 'npg'))] - return jsonify({'status': 'success', 'devices': ble_devices}) - except Exception as e: - logging.error(f"BLE scan error: {str(e)}") - return jsonify({'status': 'error', 'message': str(e)}), 500 - -@app.route('/check_stream') -def check_stream(): - is_connected = connection_manager.stream_active if hasattr(connection_manager, 'stream_active') else False - return jsonify({'connected': is_connected}) - -@app.route('/check_connection') -def check_connection(): - if connection_manager and connection_manager.stream_active: - return jsonify({'status': 'connected'}) - return jsonify({'status': 'connecting'}) - -def post_console_message(message): - global stream_active - if "LSL stream started" in message: - stream_active = True - elif "disconnected" in message: - stream_active = False - console_queue.put(message) - -@app.route('/console_updates') -def console_updates(): - def event_stream(): - while True: - message = console_queue.get() - yield f"data: {message}\n\n" - - return Response(event_stream(), mimetype="text/event-stream") - -@app.route('/launch_app', methods=['POST']) -def launch_application(): - if not connection_manager or not connection_manager.stream_active: - return jsonify({'status': 'error', 'message': 'No active stream'}), 400 - - data = request.get_json() - app_name = data.get('app') - - if not app_name: - return jsonify({'status': 'error', 'message': 'No application specified'}), 400 - - # Check if app is already running - if app_name in running_apps and running_apps[app_name].poll() is None: - return jsonify({'status': 'error', 'message': f'{app_name} is already running','code': 'ALREADY_RUNNING'}), 400 - - try: - import subprocess - import sys - - python_exec = sys.executable - process = subprocess.Popen([python_exec, f"{app_name}.py"]) - running_apps[app_name] = process - - return jsonify({'status': 'success', 'message': f'Launched {app_name}'}) - except Exception as e: - logging.error(f"Error launching {app_name}: {str(e)}") - return jsonify({'status': 'error', 'message': str(e)}), 500 - -@app.route('/check_app_status/') -def check_app_status(app_name): - if app_name in running_apps: - if running_apps[app_name].poll() is None: # Still running - return jsonify({'status': 'running'}) - else: # Process has terminated - del running_apps[app_name] - return jsonify({'status': 'not_running'}) - return jsonify({'status': 'not_running'}) - -@app.route('/connect', methods=['POST']) -def connect_device(): - global connection_manager, connection_thread, stream_active - - data = request.get_json() - protocol = data.get('protocol') - device_address = data.get('device_address') - - # Reset stream status - stream_active = False - - # Clean up any existing connection - if connection_manager: - connection_manager.cleanup() - if connection_thread and connection_thread.is_alive(): - connection_thread.join() - - # Create new connection - connection_manager = Connection() - - def run_connection(): - try: - if protocol == 'usb': - success = connection_manager.connect_usb() - elif protocol == 'wifi': - success = connection_manager.connect_wifi() - elif protocol == 'ble': - if not device_address: - logging.error("No BLE device address provided") - return - - # For BLE, we need to run in an event loop - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - success = connection_manager.connect_ble(device_address) - - if success: - post_console_message("LSL stream started") - else: - post_console_message("Connection failed") - except Exception as e: - logging.error(f"Connection error: {str(e)}") - post_console_message(f"Connection error: {str(e)}") - - # Start connection in a separate thread - connection_thread = threading.Thread(target=run_connection, daemon=True) - connection_thread.start() - - return jsonify({'status': 'connecting', 'protocol': protocol}) - -@app.route('/disconnect', methods=['POST']) -def disconnect_device(): - global connection_manager, stream_active - if connection_manager: - connection_manager.cleanup() - stream_active = False - post_console_message("disconnected") - return jsonify({'status': 'disconnected'}) - return jsonify({'status': 'no active connection'}) - -@app.route('/start_recording', methods=['POST']) -def start_recording(): - global connection_manager - if not connection_manager: - return jsonify({'status': 'error', 'message': 'No active connection'}), 400 - - data = request.get_json() - filename = data.get('filename') - - # If filename is empty or None, let connection_manager use default - if filename == "": - filename = None - - try: - if connection_manager.start_csv_recording(filename): - post_console_message(f"Recording started: {filename or 'default filename'}") - return jsonify({'status': 'recording_started'}) - return jsonify({'status': 'error', 'message': 'Failed to start recording'}), 500 - except Exception as e: - logging.error(f"Recording error: {str(e)}") - return jsonify({'status': 'error', 'message': str(e)}), 500 - -@app.route('/stop_recording', methods=['POST']) -def stop_recording(): - global connection_manager - if connection_manager: - try: - if connection_manager.stop_csv_recording(): - post_console_message("Recording stopped") - return jsonify({'status': 'recording_stopped'}) - return jsonify({'status': 'error', 'message': 'Failed to stop recording'}), 500 - except Exception as e: - logging.error(f"Stop recording error: {str(e)}") - return jsonify({'status': 'error', 'message': str(e)}), 500 - return jsonify({'status': 'error', 'message': 'No active connection'}), 400 - -if __name__ == "__main__": - app.run(debug=True) \ No newline at end of file diff --git a/app_requirements.txt b/app_requirements.txt deleted file mode 100644 index 45f8c6ec..00000000 --- a/app_requirements.txt +++ /dev/null @@ -1,13 +0,0 @@ -pyqtgraph==0.13.7 -PyQt5==5.15.11 -keyboard==0.13.5 -scipy==1.14.1 -pygame==2.6.1 -neurokit2==0.2.10 -plotly==5.24.1 -pandas==2.2.3 -tk==0.1.0 -PyAutoGUI==0.9.54 -Flask==3.1.1 -psutil==6.1.1 -websocket-client==1.8.0 \ No newline at end of file diff --git a/chords_requirements.txt b/chords_requirements.txt deleted file mode 100644 index dca1001b..00000000 --- a/chords_requirements.txt +++ /dev/null @@ -1,4 +0,0 @@ -numpy==2.1.3 -pylsl==1.16.2 -pyserial==3.5 -bleak==0.22.3 \ No newline at end of file diff --git a/chords_serial.py b/chords_serial.py deleted file mode 100644 index bbbe4dd2..00000000 --- a/chords_serial.py +++ /dev/null @@ -1,156 +0,0 @@ -import serial -import time -import numpy as np -import serial.tools.list_ports -import sys -import signal -import threading - -class Chords_USB: - SYNC_BYTE1 = 0xc7 - SYNC_BYTE2 = 0x7c - END_BYTE = 0x01 - HEADER_LENGTH = 3 - - supported_boards = { - "UNO-R3": {"sampling_rate": 250, "Num_channels": 6}, - "UNO-CLONE": {"sampling_rate": 250, "Num_channels": 6}, - "GENUINO-UNO": {"sampling_rate": 250, "Num_channels": 6}, - "UNO-R4": {"sampling_rate": 500, "Num_channels": 6}, - "RPI-PICO-RP2040": {"sampling_rate": 500, "Num_channels": 3}, - "NANO-CLONE": {"sampling_rate": 250, "Num_channels": 8}, - "NANO-CLASSIC": {"sampling_rate": 250, "Num_channels": 8}, - "STM32F4-BLACK-PILL": {"sampling_rate": 500, "Num_channels": 8}, - "STM32G4-CORE-BOARD": {"sampling_rate": 500, "Num_channels": 16}, - "MEGA-2560-R3": {"sampling_rate": 250, "Num_channels": 16}, - "MEGA-2560-CLONE": {"sampling_rate": 250, "Num_channels": 16}, - "GIGA-R1": {"sampling_rate": 500, "Num_channels": 6}, - "NPG-LITE": {"sampling_rate": 500, "Num_channels": 3}, - } - - def __init__(self): - self.ser = None - self.buffer = bytearray() - self.retry_limit = 4 - self.packet_length = None - self.num_channels = None - self.data = None - self.board = "" - - # Only install signal handler in the main thread - if threading.current_thread() is threading.main_thread(): - signal.signal(signal.SIGINT, self.signal_handler) - - def connect_hardware(self, port, baudrate, timeout=1): - try: - self.ser = serial.Serial(port, baudrate=baudrate, timeout=timeout) - retry_counter = 0 - response = None - - while retry_counter < self.retry_limit: - self.ser.write(b'WHORU\n') - try: - response = self.ser.readline().strip().decode() - except UnicodeDecodeError: - response = None - - if response in self.supported_boards: - self.board = response - print(f"{response} detected at {port} with baudrate {baudrate}") - self.num_channels = self.supported_boards[self.board]["Num_channels"] - sampling_rate = self.supported_boards[self.board]["sampling_rate"] - self.packet_length = (2 * self.num_channels) + self.HEADER_LENGTH + 1 - self.data = np.zeros((self.num_channels, 2000)) - return True - - retry_counter += 1 - - self.ser.close() - except Exception as e: - print(f"Connection Error: {e}") - return False - - def detect_hardware(self, timeout=1): - baudrates = [230400, 115200] - ports = serial.tools.list_ports.comports() - - for port in ports: - for baud in baudrates: - print(f"Trying {port.device} at {baud}...") - if self.connect_hardware(port.device, baud, timeout): - return True - - print("Unable to detect supported hardware.") - return False - - def send_command(self, command): - if self.ser and self.ser.is_open: - self.ser.flushInput() - self.ser.flushOutput() - self.ser.write(f"{command}\n".encode()) - time.sleep(0.1) - response = self.ser.readline().decode('utf-8', errors='ignore').strip() - return response - return None - - def read_data(self): - try: - raw_data = self.ser.read(self.ser.in_waiting or 1) - if raw_data == b'': - raise serial.SerialException("Serial port disconnected or No data received.") - self.buffer.extend(raw_data) - - while len(self.buffer) >= self.packet_length: - sync_index = self.buffer.find(bytes([self.SYNC_BYTE1, self.SYNC_BYTE2])) - if sync_index == -1: - self.buffer.clear() - continue - - if len(self.buffer) >= sync_index + self.packet_length: - packet = self.buffer[sync_index:sync_index + self.packet_length] - if packet[0] == self.SYNC_BYTE1 and packet[1] == self.SYNC_BYTE2 and packet[-1] == self.END_BYTE: - channel_data = [] - - for ch in range(self.num_channels): - high_byte = packet[2 * ch + self.HEADER_LENGTH] - low_byte = packet[2 * ch + self.HEADER_LENGTH + 1] - value = (high_byte << 8) | low_byte - channel_data.append(float(value)) - - self.data = np.roll(self.data, -1, axis=1) - self.data[:, -1] = channel_data - del self.buffer[:sync_index + self.packet_length] - else: - del self.buffer[:sync_index + 1] - except serial.SerialException: - self.cleanup() - - def start_streaming(self): - self.send_command('START') - self.streaming_active = True - try: - while self.streaming_active: - self.read_data() - except KeyboardInterrupt: - print("KeyboardInterrupt received.") - self.cleanup() - - def stop_streaming(self): - self.streaming_active = False - self.send_command('STOP') - - def cleanup(self): - self.stop_streaming() - try: - if self.ser and self.ser.is_open: - self.ser.close() - except Exception as e: - print(f"Error during cleanup: {e}") - - def signal_handler(self, sig, frame): - self.cleanup() - -if __name__ == "__main__": - client = Chords_USB() - client.detect_hardware() - client.start_streaming() \ No newline at end of file diff --git a/chordspy/__init__.py b/chordspy/__init__.py new file mode 100644 index 00000000..6867a364 --- /dev/null +++ b/chordspy/__init__.py @@ -0,0 +1,2 @@ +from chordspy.app import main +from chordspy.connection import Connection \ No newline at end of file diff --git a/chordspy/app.py b/chordspy/app.py new file mode 100644 index 00000000..96765da2 --- /dev/null +++ b/chordspy/app.py @@ -0,0 +1,368 @@ +""" +Flask-based web interface for managing connections to devices and applications. +This module provides a web-based GUI for: +- Scanning and connecting to devices via USB, WiFi, or BLE +- Managing data streaming and recording +- Launching and monitoring Chords-Python applications +- Displaying real-time console updates +- Handling error logging +The application uses Server-Sent Events (SSE) for real-time updates to the frontend. +""" + +# Importing Necessary Libraries +from flask import Flask, render_template, request, jsonify # Flask web framework +from chordspy.connection import Connection # Connection management module +import threading # For running connection management in a separate thread +import asyncio # For asynchronous operations, especially with BLE +import logging # For logging errors and information +from bleak import BleakScanner # BLE device scanner from Bleak library +from flask import Response # For handling server-sent events (SSE) +import queue # Queue for managing console messages +import yaml # For loading application configuration from YAML files +from pathlib import Path # For handling file paths in a platform-independent way +import os # For file and directory operations +import webbrowser # For opening the web interface in a browser + +console_queue = queue.Queue() # Global queue for console messages to be displayed in the web interface +app = Flask(__name__) # Initialize Flask application +logging.basicConfig(level=logging.INFO) # Configure logging +log = logging.getLogger('werkzeug') +log.setLevel(logging.ERROR) # Only show errors from Werkzeug (Flask's WSGI) + +# Global variables +connection_manager = None # Manages the device connection +connection_thread = None # Thread for connection management +ble_devices = [] # List of discovered BLE devices +stream_active = False # Flag indicating if data stream is active +running_apps = {} # Dictionary to track running applications + +# Error logging endpoint. This allows the frontend to send error messages to be logged. +@app.route('/log_error', methods=['POST']) +def log_error(): + """ + Endpoint for logging errors from the frontend. It receives error data via POST request and writes it to a log file. + Returns: + JSON response with status and optional error message. + """ + try: + error_data = request.get_json() + if not error_data or 'error' not in error_data or 'log_error' in str(error_data): + return jsonify({'status': 'error', 'message': 'Invalid data'}), 400 + + os.makedirs('logs', exist_ok=True) # Ensure logs directory exists + + with open('logs/logging.txt', 'a') as f: # Append error to log file + f.write(error_data['error']) + + return jsonify({'status': 'success'}) + except Exception as e: + return jsonify({'status': 'error', 'message': 'Logging failed'}), 500 + +# Decorator to run async functions in a synchronous context. It allows us to call async functions from Flask routes. +def run_async(coro): + """ + Decorator to run async functions in a synchronous context. + Args: + coro: The coroutine to be executed. + Returns: + A wrapper function that runs the coroutine in a new event loop. + """ + def wrapper(*args, **kwargs): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + return loop.run_until_complete(coro(*args, **kwargs)) + finally: + loop.close() + return wrapper + +# Main route for the web interface. It renders the index.html template. +@app.route('/') +def index(): + """Render the main index page of the web interface.""" + return render_template('index.html') + +# Route to retrieve the configuration for available Chord-Python applications. +@app.route('/get_apps_config') +def get_apps_config(): + """ + Retrieve the configuration for available applications.It looks for apps.yaml in either the package config directory or a local config directory. + Returns: + JSON response containing the application configuration or an empty list if not found. + """ + try: + config_path = Path(__file__).parent / 'config' / 'apps.yaml' # Try package-relative path first + if not config_path.exists(): + config_path = Path('chordspy.config') / 'apps.yaml' # Fallback to local path + + if config_path.exists(): + with open(config_path, 'r') as file: + config = yaml.safe_load(file) + return jsonify(config) + return jsonify({'apps': []}) + + except Exception as e: + logging.error(f"Error loading apps config: {str(e)}") + return jsonify({'apps': [], 'error': str(e)}) + +# Route to scan for nearby BLE devices. It uses BleakScanner to discover devices. +@app.route('/scan_ble') +@run_async +async def scan_ble_devices(): + """ + Scan for nearby BLE devices. It uses BleakScanner to discover devices for 5 seconds and filters for devices with names starting with 'NPG' or 'npg'. + Returns: + JSON response with list of discovered devices or error message. + """ + global ble_devices + try: + devices = await BleakScanner.discover(timeout=5) + ble_devices = [{'name': d.name or 'Unknown', 'address': d.address} + for d in devices if d.name and d.name.startswith(('NPG', 'npg'))] + return jsonify({'status': 'success', 'devices': ble_devices}) + except Exception as e: + logging.error(f"BLE scan error: {str(e)}") + return jsonify({'status': 'error', 'message': str(e)}), 500 + +# Route to check if the data stream is currently active. It checks the connection manager's stream_active flag. +@app.route('/check_stream') +def check_stream(): + """ + Check if data stream is currently active. + Returns: + JSON response with connection status. + """ + is_connected = connection_manager.stream_active if hasattr(connection_manager, 'stream_active') else False + return jsonify({'connected': is_connected}) + +# Route to check the current connection status with the device. It returns 'connected' if the stream is active, otherwise 'connecting'. +@app.route('/check_connection') +def check_connection(): + """ + Check the current connection status with the device. + Returns: + JSON response with connection status ('connected' or 'connecting'). + """ + if connection_manager and connection_manager.stream_active: + return jsonify({'status': 'connected'}) + return jsonify({'status': 'connecting'}) + +# Function to post messages to the console queue. It updates the stream_active flag based on the message content. This function is used to send messages to the web interface for display in real-time. +def post_console_message(message): + """ + Post a message to the console queue for display in the web interface and updates the stream_active flag based on message content. + Args: + message: The message to be displayed in the console. + """ + global stream_active + if "LSL stream started" in message: + stream_active = True + elif "disconnected" in message: + stream_active = False + console_queue.put(message) + +# Route for Server-Sent Events (SSE) to provide real-time console updates to the web interface. +@app.route('/console_updates') +def console_updates(): + """ + Server-Sent Events (SSE) endpoint for real-time console updates. + Returns: + SSE formatted messages from the console queue. + """ + def event_stream(): + """Generator function that yields messages from the console queue as SSE formatted messages.""" + while True: + message = console_queue.get() + yield f"data: {message}\n\n" + + return Response(event_stream(), mimetype="text/event-stream") + +# Route to launch Chord-Python application as a subprocess. It receives the application name via POST request and starts it as a Python module. +@app.route('/launch_app', methods=['POST']) +def launch_application(): + """ + Launch a Chord-Python application as a subprocess.It receives the application name via POST request and starts it as a Python module. + Returns: + JSON response indicating success or failure of application launch. + """ + if not connection_manager or not connection_manager.stream_active: + return jsonify({'status': 'error', 'message': 'No active stream'}), 400 + + data = request.get_json() + module_name = data.get('app') + + if not module_name: + return jsonify({'status': 'error', 'message': 'No application specified'}), 400 + + # Check if app is already running + if module_name in running_apps and running_apps[module_name].poll() is None: + return jsonify({'status': 'error', 'message': f'{module_name} is already running','code': 'ALREADY_RUNNING'}), 400 + + try: + import subprocess + import sys + + # Run the module using Python's -m flag + process = subprocess.Popen([sys.executable, "-m", f"chordspy.{module_name}"]) + running_apps[module_name] = process # Track running application + + return jsonify({'status': 'success', 'message': f'Launched {module_name}'}) + except Exception as e: + logging.error(f"Error launching {module_name}: {str(e)}") + return jsonify({'status': 'error', 'message': str(e)}), 500 + +# Route to check the status of a running application. It checks if the application is in the running_apps dictionary and whether its process is still active. +@app.route('/check_app_status/') +def check_app_status(app_name): + """ + Check the status of a running application. + Args: + app_name: Name of the application to check. + Returns: + JSON response indicating if the application is running or not. + """ + if app_name in running_apps: + if running_apps[app_name].poll() is None: # Still running + return jsonify({'status': 'running'}) + else: # Process has terminated + del running_apps[app_name] + return jsonify({'status': 'not_running'}) + return jsonify({'status': 'not_running'}) + +# Route to connect to a device using the specified protocol. It supports USB, WiFi, and BLE connections. Starts connection in a separate thread. +@app.route('/connect', methods=['POST']) +def connect_device(): + """ + Establish connection to a device using the specified protocol.It supports USB, WiFi, and BLE connections. Starts connection in a separate thread. + Returns: + JSON response indicating connection status. + """ + global connection_manager, connection_thread, stream_active + + data = request.get_json() + protocol = data.get('protocol') + device_address = data.get('device_address') + + # Reset stream status + stream_active = False + + # Clean up any existing connection + if connection_manager: + connection_manager.cleanup() + if connection_thread and connection_thread.is_alive(): + connection_thread.join() + + # Create new connection + connection_manager = Connection() + + def run_connection(): + """ + Internal function to handle the connection process in a thread. + """ + try: + if protocol == 'usb': + success = connection_manager.connect_usb() + elif protocol == 'wifi': + success = connection_manager.connect_wifi() + elif protocol == 'ble': + if not device_address: + logging.error("No BLE device address provided") + return + + # For BLE, we need to run in an event loop + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + success = connection_manager.connect_ble(device_address) + + if success: + post_console_message("LSL stream started") + else: + post_console_message("Connection failed") + except Exception as e: + logging.error(f"Connection error: {str(e)}") + post_console_message(f"Connection error: {str(e)}") + + # Start connection in a separate thread + connection_thread = threading.Thread(target=run_connection, daemon=True) + connection_thread.start() + + return jsonify({'status': 'connecting', 'protocol': protocol}) + +# Route to disconnect from the currently connected device. It cleans up the connection manager and resets the stream status. +@app.route('/disconnect', methods=['POST']) +def disconnect_device(): + """ + Disconnect from the currently connected device. + Returns: + JSON response indicating disconnection status. + """ + global connection_manager, stream_active + if connection_manager: + connection_manager.cleanup() + stream_active = False + post_console_message("disconnected") + return jsonify({'status': 'disconnected'}) + return jsonify({'status': 'no active connection'}) + +# Route to start recording data from the connected device to a CSV file. +@app.route('/start_recording', methods=['POST']) +def start_recording(): + """ + Start recording data from the connected device to a CSV file. + Returns: + JSON response indicating recording status. + """ + global connection_manager + if not connection_manager: + return jsonify({'status': 'error', 'message': 'No active connection'}), 400 + + data = request.get_json() + filename = data.get('filename') + + # If filename is empty or None, let connection_manager use default + if filename == "": + filename = None + + try: + if connection_manager.start_csv_recording(filename): + post_console_message(f"Recording started: {filename or 'default filename'}") + return jsonify({'status': 'recording_started'}) + return jsonify({'status': 'error', 'message': 'Failed to start recording'}), 500 + except Exception as e: + logging.error(f"Recording error: {str(e)}") + return jsonify({'status': 'error', 'message': str(e)}), 500 + +# Route to stop the current recording session. It calls the stop_csv_recording method of the connection manager. +@app.route('/stop_recording', methods=['POST']) +def stop_recording(): + """ + Stop the current recording session. + Returns: + JSON response indicating recording stop status. + """ + global connection_manager + if connection_manager: + try: + if connection_manager.stop_csv_recording(): + post_console_message("Recording stopped") + return jsonify({'status': 'recording_stopped'}) + return jsonify({'status': 'error', 'message': 'Failed to stop recording'}), 500 + except Exception as e: + logging.error(f"Stop recording error: {str(e)}") + return jsonify({'status': 'error', 'message': str(e)}), 500 + return jsonify({'status': 'error', 'message': 'No active connection'}), 400 + +# Route to check if a specific application is running. It checks the running_apps dictionary for the application's process. +def main(): + """ + Main entry point for the application. It starts the Flask server and opens the web browser to the application. + """ + def open_browser(): + """Open the default web browser to the application URL.""" + webbrowser.open("http://localhost:5000") + + threading.Timer(1, open_browser).start() # Open browser after 1 seconds to allow server to start + app.run(debug=True, use_reloader=False, host='0.0.0.0', port=5000) # Start Flask application + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/beetle.py b/chordspy/beetle.py similarity index 98% rename from beetle.py rename to chordspy/beetle.py index f9d52c1d..b58e59b9 100644 --- a/beetle.py +++ b/chordspy/beetle.py @@ -55,7 +55,7 @@ calibration_duration = 10 sprite_count = 10 -beetle_sprites = [pygame.image.load(f'media/Beetle{i}.png') for i in range(1, sprite_count + 1)] +beetle_sprites = [pygame.image.load(f'C:/Users/PAYAL/Desktop/Chords-Python/chordspy/media/Beetle{i}.png') for i in range(1, sprite_count + 1)] beetle_sprites = [pygame.transform.smoothscale(sprite, (140, 160)) for sprite in beetle_sprites] # Animation Variables diff --git a/chords_ble.py b/chordspy/chords_ble.py similarity index 57% rename from chords_ble.py rename to chordspy/chords_ble.py index 2267c259..92024af1 100644 --- a/chords_ble.py +++ b/chordspy/chords_ble.py @@ -1,3 +1,8 @@ +""" +This scripts scan and then connects to the selected devices via BLE, reads data packets, processes them, and handles connection status. +""" + +# Importing necessary libraries import asyncio from bleak import BleakScanner, BleakClient import time @@ -6,38 +11,56 @@ import threading class Chords_BLE: + """ + A class to handle BLE communication with NPG devices via BLE. + This class provides functionality to: + - Scan for compatible BLE devices + - Connect to a device + - Receive and process data packets + - Monitor connection status + - Handle disconnections and errors + """ + # Class constants - DEVICE_NAME_PREFIX = "NPG" - SERVICE_UUID = "4fafc201-1fb5-459e-8fcc-c5c9c331914b" - DATA_CHAR_UUID = "beb5483e-36e1-4688-b7f5-ea07361b26a8" - CONTROL_CHAR_UUID = "0000ff01-0000-1000-8000-00805f9b34fb" + DEVICE_NAME_PREFIX = "NPG" # Prefix for compatible device names + SERVICE_UUID = "4fafc201-1fb5-459e-8fcc-c5c9c331914b" # UUID for the BLE service + DATA_CHAR_UUID = "beb5483e-36e1-4688-b7f5-ea07361b26a8" # UUID for data characteristic + CONTROL_CHAR_UUID = "0000ff01-0000-1000-8000-00805f9b34fb" # UUID for control characteristic # Packet parameters - NUM_CHANNELS = 3 - SINGLE_SAMPLE_LEN = (NUM_CHANNELS * 2) + 1 # (1 Counter + Num_Channels * 2 bytes) + NUM_CHANNELS = 3 # Number of channels + SINGLE_SAMPLE_LEN = (NUM_CHANNELS * 2) + 1 # (1 Counter + Num_Channels * 2 bytes) BLOCK_COUNT = 10 - NEW_PACKET_LEN = SINGLE_SAMPLE_LEN * BLOCK_COUNT + NEW_PACKET_LEN = SINGLE_SAMPLE_LEN * BLOCK_COUNT # Total length of a data packet def __init__(self): - self.prev_unrolled_counter = None - self.samples_received = 0 - self.start_time = None - self.total_missing_samples = 0 - self.last_received_time = None - self.DATA_TIMEOUT = 2.0 - self.client = None - self.monitor_task = None - self.print_rate_task = None - self.running = False - self.loop = None - self.connection_event = threading.Event() - self.stop_event = threading.Event() + """ + Initialize the BLE client with default values and state variables. + """ + self.prev_unrolled_counter = None # Tracks the last sample counter value + self.samples_received = 0 # Count of received samples + self.start_time = None # Timestamp when first sample is received + self.total_missing_samples = 0 # Count of missing samples + self.last_received_time = None # Timestamp of last received data + self.DATA_TIMEOUT = 2.0 # Timeout period for data reception (seconds) + self.client = None # BLE client instance + self.monitor_task = None # Task for monitoring connection + self.print_rate_task = None # Task for printing sample rate + self.running = False # Flag indicating if client is running + self.loop = None # Asyncio event loop + self.connection_event = threading.Event() # Event for connection status + self.stop_event = threading.Event() # Event for stopping operations @classmethod async def scan_devices(cls): + """ + Scan for BLE devices with the NPG prefix. + Returns: + list: A list of discovered devices matching the NPG prefix + """ print("Scanning for BLE devices...") devices = await BleakScanner.discover() - filtered = [d for d in devices if d.name and d.name.startswith(cls.DEVICE_NAME_PREFIX)] + filtered = [d for d in devices if d.name and d.name.startswith(cls.DEVICE_NAME_PREFIX)] # Filter devices by name prefix if not filtered: print("No NPG devices found.") @@ -46,13 +69,19 @@ async def scan_devices(cls): return filtered def process_sample(self, sample_data: bytearray): - """Process a single EEG sample packet""" + """ + Process a single sample packet. + Args: + sample_data (bytearray): The raw sample data to process + """ self.last_received_time = time.time() + # Validate sample length if len(sample_data) != self.SINGLE_SAMPLE_LEN: print("Unexpected sample length:", len(sample_data)) return + # Extract and process sample counter sample_counter = sample_data[0] if self.prev_unrolled_counter is None: self.prev_unrolled_counter = sample_counter @@ -63,6 +92,7 @@ def process_sample(self, sample_data: bytearray): else: current_unrolled = self.prev_unrolled_counter - last + sample_counter + # Check for missing samples if current_unrolled != self.prev_unrolled_counter + 1: missing = current_unrolled - (self.prev_unrolled_counter + 1) print(f"Missing {missing} sample(s)") @@ -70,34 +100,47 @@ def process_sample(self, sample_data: bytearray): self.prev_unrolled_counter = current_unrolled + # Record start time if this is the first sample if self.start_time is None: self.start_time = time.time() + # Extract channel data (2 bytes per channel, big-endian, signed) channels = [int.from_bytes(sample_data[i:i+2], byteorder='big', signed=True) for i in range(1, len(sample_data), 2)] self.samples_received += 1 def notification_handler(self, sender, data: bytearray): - """Handle incoming notifications from the BLE device""" + """ + Handle incoming notifications from the BLE device. + Args: + sender: The characteristic that sent the notification + data (bytearray): The received data packet + """ try: - if len(data) == self.NEW_PACKET_LEN: - for i in range(0, self.NEW_PACKET_LEN, self.SINGLE_SAMPLE_LEN): + if len(data) == self.NEW_PACKET_LEN: # Process data based on packet length + for i in range(0, self.NEW_PACKET_LEN, self.SINGLE_SAMPLE_LEN): # Process a block of samples self.process_sample(data[i:i+self.SINGLE_SAMPLE_LEN]) elif len(data) == self.SINGLE_SAMPLE_LEN: - self.process_sample(data) + self.process_sample(data) # Process a single sample else: print(f"Unexpected packet length: {len(data)} bytes") except Exception as e: print(f"Error processing data: {e}") async def print_rate(self): + """Print the current sample rate every second.""" while not self.stop_event.is_set(): await asyncio.sleep(1) self.samples_received = 0 async def monitor_connection(self): - """Monitor the connection status and check for data interruptions""" + """ + Monitor the connection status and check for data interruptions. + This runs in a loop to check: + - If data hasn't been received within the timeout period + - If the BLE connection has been lost + """ while not self.stop_event.is_set(): if self.last_received_time and (time.time() - self.last_received_time) > self.DATA_TIMEOUT: print("\nData Interrupted") @@ -112,6 +155,13 @@ async def monitor_connection(self): await asyncio.sleep(0.5) async def async_connect(self, device_address): + """ + Asynchronously connect to a BLE device and start data reception. + Args: + device_address (str): The MAC address of the device to connect to + Returns: + bool: True if connection was successful, otherwise False + """ try: print(f"Attempting to connect to {device_address}...") @@ -125,16 +175,20 @@ async def async_connect(self, device_address): print(f"Connected to {device_address}", flush=True) self.connection_event.set() + # Initialize monitoring tasks self.last_received_time = time.time() self.monitor_task = asyncio.create_task(self.monitor_connection()) self.print_rate_task = asyncio.create_task(self.print_rate()) + # Send start command to device await self.client.write_gatt_char(self.CONTROL_CHAR_UUID, b"START", response=True) print("Sent START command") + # Subscribe to data notifications await self.client.start_notify(self.DATA_CHAR_UUID, self.notification_handler) print("Subscribed to data notifications") + # Main loop self.running = True while self.running and not self.stop_event.is_set(): await asyncio.sleep(1) @@ -148,6 +202,7 @@ async def async_connect(self, device_address): await self.cleanup() async def cleanup(self): + """Clean up resources and disconnect from the device.""" if self.monitor_task: self.monitor_task.cancel() if self.print_rate_task: @@ -158,6 +213,11 @@ async def cleanup(self): self.connection_event.clear() def connect(self, device_address): + """ + Connect to a BLE device (wrapper for async_connect). + Args: + device_address (str): The MAC address of the device to connect to + """ self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self.loop) @@ -171,12 +231,14 @@ def connect(self, device_address): self.loop.close() def stop(self): + """Stop all operations and clean up resources.""" self.stop_event.set() self.running = False if self.loop and self.loop.is_running(): self.loop.call_soon_threadsafe(self.loop.stop) def parse_args(): + """Parse command line arguments.""" parser = argparse.ArgumentParser() parser.add_argument("--scan", action="store_true", help="Scan for devices") parser.add_argument("--connect", type=str, help="Connect to device address") @@ -188,11 +250,11 @@ def parse_args(): try: if args.scan: - devices = asyncio.run(Chords_BLE.scan_devices()) + devices = asyncio.run(Chords_BLE.scan_devices()) # Scan for devices for dev in devices: print(f"DEVICE:{dev.name}|{dev.address}") elif args.connect: - client.connect(args.connect) + client.connect(args.connect) # Connect to specified device try: while client.running: time.sleep(1) diff --git a/chordspy/chords_serial.py b/chordspy/chords_serial.py new file mode 100644 index 00000000..98a999e9 --- /dev/null +++ b/chordspy/chords_serial.py @@ -0,0 +1,280 @@ +""" +CHORDS USB Data Acquisition Script - This script supports detecting, connecting to, and reading data from +supported development boards via USB. It handles device identification, serial communication, +data packet parsing, and streaming management. The system supports multiple microcontroller +boards with different sampling rates and channel configurations. +""" + +# Importing necessary libraries +import serial +import time +import numpy as np +import serial.tools.list_ports +import sys +import signal +import threading + +class Chords_USB: + """ + A class to interface with microcontroller development hardware for data acquisition. + This class handles communication with various supported microcontroller boards, data streaming, and packet parsing. It provides methods for hardware detection, connection, and data acquisition. + Attributes: + SYNC_BYTE1 (int): First synchronization byte for packet identification + SYNC_BYTE2 (int): Second synchronization byte for packet identification + END_BYTE (int): End byte marking the end of a packet + HEADER_LENGTH (int): Length of the packet header (sync bytes + counter) + supported_boards (dict): Dictionary of supported boards with their specifications + ser (serial.Serial): Serial connection object + buffer (bytearray): Buffer for incoming data + retry_limit (int): Maximum connection retries + packet_length (int): Expected packet length for current board + num_channels (int): Number of channels for current board + data (numpy.ndarray): Array for storing channel data + board (str): Detected board name + streaming_active (bool): Streaming state flag + """ + # Packet protocol constants + SYNC_BYTE1 = 0xc7 # First synchronization byte + SYNC_BYTE2 = 0x7c # Second synchronization byte + END_BYTE = 0x01 # End of packet marker + HEADER_LENGTH = 3 # Length of packet header (sync bytes + counter) + + # Supported boards with their sampling rate and Number of Channels + supported_boards = { + "UNO-R3": {"sampling_rate": 250, "Num_channels": 6}, + "UNO-CLONE": {"sampling_rate": 250, "Num_channels": 6}, + "GENUINO-UNO": {"sampling_rate": 250, "Num_channels": 6}, + "UNO-R4": {"sampling_rate": 500, "Num_channels": 6}, + "RPI-PICO-RP2040": {"sampling_rate": 500, "Num_channels": 3}, + "NANO-CLONE": {"sampling_rate": 250, "Num_channels": 8}, + "NANO-CLASSIC": {"sampling_rate": 250, "Num_channels": 8}, + "STM32F4-BLACK-PILL": {"sampling_rate": 500, "Num_channels": 8}, + "STM32G4-CORE-BOARD": {"sampling_rate": 500, "Num_channels": 16}, + "MEGA-2560-R3": {"sampling_rate": 250, "Num_channels": 16}, + "MEGA-2560-CLONE": {"sampling_rate": 250, "Num_channels": 16}, + "GIGA-R1": {"sampling_rate": 500, "Num_channels": 6}, + "NPG-LITE": {"sampling_rate": 500, "Num_channels": 3}, + } + + def __init__(self): + """ + Initialize the Chords_USB client and sets up serial connection attributes, data buffer, and installs signal handler for clean exit on interrupt signals. + """ + self.ser = None # Serial connection object + self.buffer = bytearray() # Buffer for incoming data + self.retry_limit = 4 # Maximum connection retries + self.packet_length = None # Expected packet length for current board + self.num_channels = None # Number of data channels for current board + self.data = None # Numpy array for storing channel data + self.board = "" # Detected board name + self.streaming_active = False # Streaming state flag + + # Only install signal handler in the main thread + if threading.current_thread() is threading.main_thread(): + signal.signal(signal.SIGINT, self.signal_handler) + + def connect_hardware(self, port, baudrate, timeout=1): + """ + Attempt to connect to hardware at the specified port and baudrate. + Args: + port (str): Serial port to connect to (e.g., 'COM3' or '/dev/ttyUSB0') + baudrate (int): Baud rate for serial communication + timeout (float, optional): Serial timeout in seconds. Defaults to 1. + Returns: + bool: True if connection and board identification succeeded, False otherwise + + The method performs the following steps: + 1. Establishes serial connection + 2. Sends 'WHORU' command + 3. Validates response against supported boards + 4. Configures parameters based on detected board + """ + try: + self.ser = serial.Serial(port, baudrate=baudrate, timeout=timeout) # Initialize serial connection + retry_counter = 0 + response = None + + while retry_counter < self.retry_limit: # Try to identify the board with retries + self.ser.write(b'WHORU\n') # Send identification command + try: + response = self.ser.readline().strip().decode() + except UnicodeDecodeError: + response = None + + if response in self.supported_boards: # Board identified successfully + self.board = response + print(f"{response} detected at {port} with baudrate {baudrate}") + self.num_channels = self.supported_boards[self.board]["Num_channels"] + sampling_rate = self.supported_boards[self.board]["sampling_rate"] + self.packet_length = (2 * self.num_channels) + self.HEADER_LENGTH + 1 # Calculate expected packet length: 2 bytes per channel + header + end byte + self.data = np.zeros((self.num_channels, 2000)) # Initialize data buffer with 2000 samples per channel + return True + + retry_counter += 1 + + # Connection failed after retries + self.ser.close() + except Exception as e: + print(f"Connection Error: {e}") + return False + + def detect_hardware(self, timeout=1): + """ + Automatically detect and connect to supported hardware. + Scans available serial ports and tries common baud rates to find a supported CHORDS USB device. + Args: + timeout (float, optional): Serial timeout in seconds. Defaults to 1. + Returns: + bool: True if hardware was detected and connected, False otherwise + """ + baudrates = [230400, 115200] # Common baud rates to try with + ports = serial.tools.list_ports.comports() # Get list of available serial ports + + # Try all ports and baud rates + for port in ports: + for baud in baudrates: + print(f"Trying {port.device} at {baud}...") + if self.connect_hardware(port.device, baud, timeout): + return True + + print("Unable to detect supported hardware.") + return False + + def send_command(self, command): + """ + Send a command to the connected hardware. + Args: + command (str): Command to send (e.g., 'START', 'STOP') + Returns: + str: Response from hardware if available, None otherwise + Note: Flushes input/output buffers before sending command to ensure clean communication + """ + if self.ser and self.ser.is_open: + self.ser.flushInput() # Clear buffers to avoid stale data + self.ser.flushOutput() # Clear buffers to avoid stale data + self.ser.write(f"{command}\n".encode()) # Send command with newline terminator + time.sleep(0.1) # Small delay to allow hardware response + response = self.ser.readline().decode('utf-8', errors='ignore').strip() # Read and decode response + return response + return None + + def read_data(self): + """ + Read and process incoming data from the serial connection. Parses packets, validates them, and stores channel data in the data buffer. + serial.SerialException raised: If serial port is disconnected or no data received + """ + try: + # Read available data or wait for at least 1 byte + raw_data = self.ser.read(self.ser.in_waiting or 1) + if raw_data == b'': + raise serial.SerialException("Serial port disconnected or No data received.") + self.buffer.extend(raw_data) + + # Process complete packets in the buffer + while len(self.buffer) >= self.packet_length: + sync_index = self.buffer.find(bytes([self.SYNC_BYTE1, self.SYNC_BYTE2])) # Find synchronization bytes in buffer + if sync_index == -1: + self.buffer.clear() # No sync found, clear buffer + continue + + # Check if we have a complete packet + if len(self.buffer) >= sync_index + self.packet_length: + packet = self.buffer[sync_index:sync_index + self.packet_length] + # Validate packet structure + if (packet[0] == self.SYNC_BYTE1 and packet[1] == self.SYNC_BYTE2 and packet[-1] == self.END_BYTE): + channel_data = [] # Extract channel data + + for ch in range(self.num_channels): + # Combine high and low bytes for each channel + high_byte = packet[2 * ch + self.HEADER_LENGTH] + low_byte = packet[2 * ch + self.HEADER_LENGTH + 1] + value = (high_byte << 8) | low_byte + channel_data.append(float(value)) + + self.data = np.roll(self.data, -1, axis=1) # Update data buffer (rolling window) + self.data[:, -1] = channel_data + del self.buffer[:sync_index + self.packet_length] # Remove processed packet from buffer + else: + del self.buffer[:sync_index + 1] # Invalid packet, skip the first sync byte + except serial.SerialException: + self.cleanup() + + def start_streaming(self): + """ + Start continuous data streaming from the hardware by sending the 'START' command and enters a loop to continuously read and process incoming data until stopped or interrupted. + """ + self.send_command('START') + self.streaming_active = True + try: + while self.streaming_active: + self.read_data() + except KeyboardInterrupt: + print("KeyboardInterrupt received.") + self.cleanup() + + def stop_streaming(self): + """ + Stop data streaming by sending 'STOP' Command to the hardware and sets the streaming flag to False. + """ + self.streaming_active = False + self.send_command('STOP') + + def cleanup(self): + """ + Clean up resources and ensure proper shutdown.It stops streaming, closes serial connection, and handles any cleanup errors. + """ + self.stop_streaming() + try: + if self.ser and self.ser.is_open: + self.ser.close() + except Exception as e: + print(f"Error during cleanup: {e}") + + def signal_handler(self, sig, frame): + """ + Signal handler for interrupt signals (Ctrl+C).It ensures clean shutdown when the program is interrupted. + """ + print("\nInterrupt received, shutting down...") + self.cleanup() + sys.exit(0) + + def start_timer(self): + """ + Start the timer for packet counting and logging. + """ + global start_time, last_ten_minute_time, total_packet_count, cumulative_packet_count + current_time = time.time() + start_time = current_time # Session start time + last_ten_minute_time = current_time # 10-minute interval start time + total_packet_count = 0 # Counter for packets in current second + cumulative_packet_count = 0 # Counter for all packets + + def log_one_second_data(self): + """ + Log data for one second intervals and displays: Number of packets received in the last second, Number of missing samples (if any) + """ + global total_packet_count, samples_per_second, missing_samples + samples_per_second = total_packet_count + print(f"Data count for the last second: {total_packet_count} samples, "f"Missing samples: {missing_samples}") + total_packet_count = 0 # Reset for next interval + + def log_ten_minute_data(self): + """ + Log data for 10-minute intervals and displays: Total packets received, Actual sampling rate, Drift from expected rate + """ + global cumulative_packet_count, last_ten_minute_time, supported_boards, board + print(f"Total data count after 10 minutes: {cumulative_packet_count}") + sampling_rate = cumulative_packet_count / (10 * 60) # Calculate actual sampling rate + print(f"Sampling rate: {sampling_rate:.2f} samples/second") + expected_sampling_rate = supported_boards[board]["sampling_rate"] + drift = ((sampling_rate - expected_sampling_rate) / expected_sampling_rate) * 3600 # Calculate drift from expected rate + print(f"Drift: {drift:.2f} seconds/hour") + + # Reset counters + cumulative_packet_count = 0 + last_ten_minute_time = time.time() + +if __name__ == "__main__": + client = Chords_USB() # Create and run the USB client + client.detect_hardware() # Detect and connect to hardware + client.start_streaming() # Start streaming data \ No newline at end of file diff --git a/chords_wifi.py b/chordspy/chords_wifi.py similarity index 54% rename from chords_wifi.py rename to chordspy/chords_wifi.py index 33b91f1b..2fdd43cf 100644 --- a/chords_wifi.py +++ b/chordspy/chords_wifi.py @@ -1,17 +1,52 @@ +""" +CHORDS WiFi Data Acquisition Script: This script provides a WebSocket client for connecting to and receiving data from +a CHORDS-compatible WiFi device. It handles connection management, data reception, and basic data validation. +""" + +# Importing necessary libraries import time import sys import websocket import socket -from scipy.signal import butter, filtfilt class Chords_WIFI: + """ + A class for connecting to and receiving data from a CHORDS WiFi device. + This class handles WebSocket communication with a CHORDS device, processes incoming data packets, and provides basic data validation and rate calculation. + Attributes: + stream_name (str): Name of the data stream (default: 'NPG') + channels (int): Number of data channels (default: 3) + sampling_rate (int): Expected sampling rate in Hz (default: 500) + block_size (int): Size of each data block in bytes (default: 13) + timeout_sec (int): Timeout period for no data received in seconds (default: 1) + packet_size (int): Count of received packets + data_size (int): Total size of received data in bytes + sample_size (int): Count of received samples + previous_sample_number (int): Last received sample number for validation + previous_data (list): Last received channel data + start_time (float): Timestamp when measurement started + last_data_time (float): Timestamp of last received data + cleanup_done (bool): Flag indicating if cleanup was performed + ws (websocket.WebSocket): WebSocket connection object + """ + def __init__(self, stream_name='NPG', channels=3, sampling_rate=500, block_size=13, timeout_sec=1): + """ + Initialize the WiFi client with connection parameters. + Args: + stream_name (str): Name of the data stream (default: 'NPG') + channels (int): Number of data channels (default: 3) + sampling_rate (int): Expected sampling rate in Hz (default: 500) + block_size (int): Size of each data block in bytes (default: 13) + timeout_sec (int): Timeout period for no data in seconds (default: 1) + """ self.stream_name = stream_name self.channels = channels self.sampling_rate = sampling_rate self.block_size = block_size self.timeout_sec = timeout_sec # Timeout for no data received + # Data tracking variables self.packet_size = 0 self.data_size = 0 self.sample_size = 0 @@ -23,8 +58,13 @@ def __init__(self, stream_name='NPG', channels=3, sampling_rate=500, block_size= self.ws = None def connect(self): + """ + Establish WebSocket connection to the CHORDS device. It Attempts to resolve the hostname 'multi-emg.local' and connect to its WebSocket server. + """ try: - host_ip = socket.gethostbyname("multi-emg.local") + host_ip = socket.gethostbyname("multi-emg.local") # Resolve hostname to IP address + + # Create and connect WebSocket self.ws = websocket.WebSocket() self.ws.connect(f"ws://{host_ip}:81") sys.stderr.write(f"{self.stream_name} WebSocket connected!\n") @@ -34,13 +74,30 @@ def connect(self): sys.exit(1) def calculate_rate(self, size, elapsed_time): + """ + Calculate rate (samples/packets/bytes per second). + Args: + size (int): Count of items (samples, packets, or bytes) + elapsed_time (float): Time period in seconds + Returns: + float: Rate in items per second, or 0 if elapsed_time is 0 + """ return size / elapsed_time if elapsed_time > 0 else 0 def process_data(self): + """ + Main data processing loop. It continuously receives data from the WebSocket, validates samples, and calculates rates. Handles connection errors and timeouts. + The method: + 1. Receives data from WebSocket + 2. Checks for connection timeouts + 3. Calculates rates every second + 4. Validates sample sequence numbers + 5. Processes channel data + """ try: while True: try: - data = self.ws.recv() + data = self.ws.recv() # Receive data from WebSocket self.last_data_time = time.time() # Update when data is received except (websocket.WebSocketConnectionClosedException, ConnectionResetError) as e: print(f"\nConnection closed: {str(e)}") @@ -60,43 +117,51 @@ def process_data(self): # Process your data here self.data_size += len(data) - current_time = time.time() + current_time = time.time() # Calculate rates every second elapsed_time = current_time - self.start_time if elapsed_time >= 1.0: + # Calculate samples, packets, and bytes per second sps = self.calculate_rate(self.sample_size, elapsed_time) fps = self.calculate_rate(self.packet_size, elapsed_time) bps = self.calculate_rate(self.data_size, elapsed_time) + # Reset counters self.packet_size = 0 self.sample_size = 0 self.data_size = 0 self.start_time = current_time + # Process binary data if isinstance(data, (bytes, list)): self.packet_size += 1 + # Process each block in the packet for i in range(0, len(data), self.block_size): self.sample_size += 1 block = data[i:i + self.block_size] - if len(block) < self.block_size: + if len(block) < self.block_size: # Skip incomplete blocks continue - sample_number = block[0] + sample_number = block[0] # Extract sample number (first byte) channel_data = [] + # Extract channel data (2 bytes per channel) for ch in range(self.channels): offset = 1 + ch * 2 sample = int.from_bytes(block[offset:offset + 2], byteorder='big', signed=True) channel_data.append(sample) + # Validate sample sequence if self.previous_sample_number == -1: self.previous_sample_number = sample_number self.previous_data = channel_data else: + # Check for missing samples if sample_number - self.previous_sample_number > 1: print("\nError: Sample Lost") self.cleanup() sys.exit(1) + # Check for duplicate samples elif sample_number == self.previous_sample_number: print("\nError: Duplicate Sample") self.cleanup() @@ -113,6 +178,9 @@ def process_data(self): self.cleanup() def cleanup(self): + """ + Clean up resources and close connections. It safely closes the WebSocket connection if it exists and ensures cleanup only happens once. + """ if not self.cleanup_done: try: if hasattr(self, 'ws') and self.ws: @@ -124,12 +192,15 @@ def cleanup(self): self.cleanup_done = True def __del__(self): + """ + Destructor to ensure cleanup when object is garbage collected. + """ self.cleanup() if __name__ == "__main__": client = None try: - client = Chords_WIFI() + client = Chords_WIFI() # Create and run WiFi client client.connect() client.process_data() except Exception as e: diff --git a/config/apps.yaml b/chordspy/config/apps.yaml similarity index 100% rename from config/apps.yaml rename to chordspy/config/apps.yaml diff --git a/chordspy/connection.py b/chordspy/connection.py new file mode 100644 index 00000000..1ff1d8d7 --- /dev/null +++ b/chordspy/connection.py @@ -0,0 +1,681 @@ +""" +CHORDS Data Connection- +This scripts provides a unified interface for connecting to CHORDS devices via multiple protocols +(USB, WiFi, BLE) and streaming data to LSL (Lab Streaming Layer) and/or CSV files. + +Key Features: +- Multi-protocol support (USB, WiFi, BLE) +- Simultaneous LSL streaming and CSV recording +- Automatic device discovery and connection + +Typical Usage: +1. Initialize Connection object +2. Connect to device via preferred protocol +3. Configure LSL stream parameters +4. Start data streaming/CSV recording +5. Process incoming data +6. Clean shutdown on exit +""" + +# Importing necessary libraries +from chordspy.chords_serial import Chords_USB # USB protocol handler +from chordspy.chords_wifi import Chords_WIFI # WiFi protocol handler +from chordspy.chords_ble import Chords_BLE # BLE protocol handler +import argparse # For command-line argument parsing +import time # For timing operations and timestamps +import asyncio # For asynchronous BLE operations +import csv # For CSV file recording +from datetime import datetime # For timestamp generation +import threading # For multi-threaded operations +from collections import deque # For efficient rate calculation +from pylsl import StreamInfo, StreamOutlet # LSL streaming components +from pylsl import StreamInlet, resolve_stream # LSL stream resolution +from pylsl import local_clock # For precise timing +import numpy as np # For numerical operations + +class Connection: + """ + Main connection manager class for supported devices. + This class serves as the central hub for all device communication, providing: + - Unified interface across multiple connection protocols(WiFi/BLE/USB) + - Data streaming to LSL + - Data recording to CSV files + - Connection state management + - Sample validation and rate monitorin + The class maintains separate connection handlers for each protocol (USB/WiFi/BLE) + and manages their lifecycle. It implements thread-safe operations for concurrent + data handling and provides clean shutdown procedures. + """ + def __init__(self): + """ + Initialize the connection manager with default values. + """ + # Protocol Connection Handlers + self.ble_connection = None # BLE protocol handler + self.wifi_connection = None # WiFi protocol handler + self.usb_connection = None # USB protocol handler + self.lsl_connection = None # LSL stream outlet (created when streaming starts) + + # LSL Stream Configuration + self.stream_name = "BioAmpDataStream" # Default LSL stream name + self.stream_type = "EXG" # LSL stream type + self.stream_format = "float32" # Data format for LSL samples + self.stream_id = "UDL" # Unique stream identifier + + # Data Tracking Systems + self.last_sample = None # Stores the most recent sample received + self.samples_received = 0 # Total count of samples received + self.start_time = time.time() # Timestamp when connection was established + + # CSV Recording Systems + self.csv_file = None # File handle for CSV output + self.csv_writer = None # CSV writer object + self.sample_counter = 0 # Count of samples written to CSV + + # Stream Parameters + self.num_channels = 0 # Number of data channels + self.sampling_rate = 0 # Current sampling rate in Hz + + # System State Flags + self.stream_active = False # True when LSL streaming is active + self.recording_active = False # True when CSV recording is active + + # Thread Management + self.usb_thread = None # Thread for USB data handling + self.ble_thread = None # Thread for BLE data handling + self.wifi_thread = None # Thread for WiFi data handling + self.running = False # Main system running flag + + # Rate Monitoring Systems + self.sample_count = 0 # Samples received in current interval + self.rate_window = deque(maxlen=10) # Window for rate calculation + self.last_timestamp = time.perf_counter() # Last rate calculation time + self.rate_update_interval = 0.5 # Seconds between rate updates + self.ble_samples_received = 0 # Count of BLE-specific samples + + async def get_ble_device(self): + """ + Scan for and select a BLE device interactively. + This asynchronous method: Scans for available BLE devices using Chords_BLE scanner, presents discovered devices to user, handles user selection, returns selected device object. + Returns: + Device: The selected BLE device object or None if no devices found, invalid selection, user cancellation. + """ + devices = await Chords_BLE.scan_devices() # Scan for available BLE devices + + # Handle case where no devices are found + if not devices: + print("No NPG devices found!") + return None + + print("\nFound NPG Devices:") # Display discovered devices to user + for i, device in enumerate(devices): + print(f"[{i}] {device.name} - {device.address}") + + try: + selection = int(input("\nEnter device number to connect: ")) # Get user selection + if 0 <= selection < len(devices): # Validate selection + return devices[selection] + print("Invalid selection!") + return None + except (ValueError, KeyboardInterrupt): + print("\nCancelled.") # Handle invalid input or user cancellation + return None + + def setup_lsl(self, num_channels, sampling_rate): + """ + Set up LSL (Lab Streaming Layer) stream outlet. + This method: creates a new LSL stream info object, initializes the LSL outlet, updates stream parameters, sets streaming state flag. + Args: + num_channels (int): Number of data channels in stream + sampling_rate (float): Sampling rate in Hz + """ + # Create LSL stream info with configured parameters + info = StreamInfo(self.stream_name, self.stream_type, num_channels, sampling_rate, self.stream_format, self.stream_id) + self.lsl_connection = StreamOutlet(info) # Initialize LSL outlet + print(f"LSL stream started: {num_channels} channels at {sampling_rate}Hz") + self.stream_active = True + self.num_channels = num_channels + self.sampling_rate = sampling_rate + + def start_csv_recording(self, filename=None): + """ + Start CSV recording session. + This method: Verify recording isn't already active, generates filename, opens CSV file and initializes writer, writes column headers, sets recording state flag. + Args: + filename (str, optional): Custom filename without extension + Returns: + bool: True if recording started successfully, False otherwise + """ + # Check if recording is already active + if self.recording_active: + return False + + try: + # Generate filename if not provided + if not filename: + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + filename = f"ChordsPy_{timestamp}.csv" + elif not filename.endswith('.csv'): + filename += '.csv' + + self.csv_file = open(filename, 'w', newline='') # Open CSV file and initialize writer + headers = ['Counter'] + [f'Channel{i+1}' for i in range(self.num_channels)] # Create column headers + self.csv_writer = csv.writer(self.csv_file) + self.csv_writer.writerow(headers) + self.recording_active = True # Update state + self.sample_counter = 0 + print(f"CSV recording started: {filename}") + return True + except Exception as e: + print(f"Error starting CSV recording: {str(e)}") # Handle file operation errors + return False + + def stop_csv_recording(self): + """ + Stop CSV recording session. + This method: Validates recording is active, closes CSV file, cleans up resources, resets recording state. + Returns: + bool: True if recording stopped successfully, False otherwise + """ + # Check if recording is inactive + if not self.recording_active: + return False + + try: + # Close file and clean up + if self.csv_file: + self.csv_file.close() + self.csv_file = None + self.csv_writer = None + self.recording_active = False # Update state + print("CSV recording stopped") + return True + except Exception as e: + print(f"Error stopping CSV recording: {str(e)}") # Handle file closing errors + return False + + def log_to_csv(self, sample_data): + """ + Log a sample to CSV file. + This method: Validates recording is active, formats sample data, writes to CSV, handles write errors. + Args: + sample_data (list): List of channel values to record + """ + # Check if recording is inactive + if not self.recording_active or not self.csv_writer: + return + + try: + # Format and write sample + self.sample_counter += 1 + row = [self.sample_counter] + sample_data + self.csv_writer.writerow(row) + except Exception as e: + print(f"Error writing to CSV: {str(e)}") # Handle write errors and stop recording + self.stop_csv_recording() + + def update_sample_rate(self): + """ + Update and display current sample rate. It calculates rate over a moving window and prints to console. It uses perf_counter() for highest timing precision. + """ + now = time.perf_counter() # Get current high-resolution timestamp + elapsed = now - self.last_timestamp # Calculate time elapsed since last calculation + self.sample_count += 1 # Increment sample counter for this interval + + # Only update display if we've collected enough time (default 0.5s) + if elapsed >= self.rate_update_interval: + current_rate = self.sample_count / elapsed # Calculate current instantaneous rate (samples/second) + self.rate_window.append(current_rate) # Add to our moving window of recent rates (default 10 values) + + # Print average rate + avg_rate = sum(self.rate_window) / len(self.rate_window) + print(f"\rCurrent sampling rate: {avg_rate:.2f} Hz", end="", flush=True) # Using \r to overwrite previous line + + # Reset counters for next interval + self.sample_count = 0 + self.last_timestamp = now + + def lsl_rate_checker(self, duration=1.0): + """ + Independently verifies the actual streaming rate of the LSL outlet. + This method: Collects timestamps over a measurement period -> calculates rate from timestamp differences. + Args: + duration: Measurement duration in seconds + """ + try: + streams = resolve_stream('type', self.stream_type) + if not streams: + print("No LSL stream found to verify.") + return + inlet = StreamInlet(streams[0]) # Create an inlet to receive data + timestamps = [] + start_time = time.time() + + # Collect data for specified duration + while time.time() - start_time < duration: + sample, ts = inlet.pull_sample(timeout=1.0) + if ts: + timestamps.append(ts) + + if len(timestamps) > 10: + diffs = np.diff(timestamps) # Calculate time differences between consecutive samples + filtered_diffs = [d for d in diffs if d > 0] # Filter out zero/negative differences (invalid) + if filtered_diffs: + estimated_rate = 1 / np.mean(filtered_diffs) # Rate = 1/average interval between samples + else: + print("\nAll timestamps had zero difference (invalid).") + else: + print("\nNot enough timestamps collected to estimate rate.") + + except Exception as e: + print(f"Error in LSL rate check: {str(e)}") + + def ble_data_handler(self): + """ + BLE-specific data handler with precise timing control. + The handler ensures: + 1. Precise sample timing using local_clock() + 2. Constant sampling rate regardless of BLE packet timing + 3. Graceful handling of buffer overflows + 4. Thread-safe operation with the main controller + """ + # Target specifications for the BLE stream + SAMPLE_INTERVAL = 1.0 / self.sampling_rate # Time between samples in seconds + next_sample_time = local_clock() # Initialize timing baseline + + # Main processing loop - runs while system is active and BLE connected + while self.running and self.ble_connection: + try: + # Check if new BLE data is available + if hasattr(self.ble_connection, 'data_available') and self.ble_connection.data_available: + current_time = local_clock() # Get precise current timestamp + + # Only process if we've reached the next scheduled sample time + if current_time >= next_sample_time: + sample = self.ble_connection.get_latest_sample() + if sample: + channel_data = sample[:self.num_channels] # Extract channel data + + # Calculate precise timestamp + sample_time = next_sample_time + next_sample_time += SAMPLE_INTERVAL # Schedule next sample time + + # If we're falling behind, skip samples to catch up + if current_time > next_sample_time + SAMPLE_INTERVAL: + next_sample_time = current_time + SAMPLE_INTERVAL + + # Stream to LSL if enabled + if self.lsl_connection: + self.lsl_connection.push_sample(channel_data, timestamp=sample_time) + + # Update rate display + self.update_sample_rate() + + # Log to CSV if recording + if self.recording_active: + self.log_to_csv(channel_data) + except Exception as e: + print(f"BLE data handler error: {str(e)}") + break + + def wifi_data_handler(self): + """ + WiFi-specific data handler with network-optimized timing. + """ + SAMPLE_INTERVAL = 1.0 / self.sampling_rate # Time between samples in seconds + next_sample_time = local_clock() # Initialize timing baseline + + while self.running and self.wifi_connection: + try: + # Verify WiFi data is available + if hasattr(self.wifi_connection, 'data_available') and self.wifi_connection.data_available: + current_time = local_clock() + + # Timing gate ensures precise sample rate + if current_time >= next_sample_time: + sample = self.wifi_connection.get_latest_sample() + if sample: + channel_data = sample[:self.num_channels] + + # Calculate precise timestamp + sample_time = next_sample_time + next_sample_time += SAMPLE_INTERVAL + + # If we're falling behind, skip samples to catch up + if current_time > next_sample_time + SAMPLE_INTERVAL: + next_sample_time = current_time + SAMPLE_INTERVAL + + if self.lsl_connection: + self.lsl_connection.push_sample(channel_data, timestamp=sample_time) + + self.update_sample_rate() + + if self.recording_active: + self.log_to_csv(channel_data) + except Exception as e: + print(f"WiFi data handler error: {str(e)}") + break + + def usb_data_handler(self): + """ + USB data handler with serial port optimization. + """ + SAMPLE_INTERVAL = 1.0 / self.sampling_rate # Time between samples in seconds + next_sample_time = local_clock() # Initialize timing baseline + + while self.running and self.usb_connection: + try: + # Verify USB port is open and active + if hasattr(self.usb_connection, 'ser') and self.usb_connection.ser.is_open: + self.usb_connection.read_data() # Read raw data from serial port + + # Process if new data exists + if hasattr(self.usb_connection, 'data'): + current_time = local_clock() + + if current_time >= next_sample_time: + sample = self.usb_connection.data[:, -1] # Get most recent sample from numpy array + channel_data = sample.tolist() # Convert to list format + + # Calculate precise timestamp + sample_time = next_sample_time + next_sample_time += SAMPLE_INTERVAL + + # USB-specific overflow handling + if current_time > next_sample_time + SAMPLE_INTERVAL: + next_sample_time = current_time + SAMPLE_INTERVAL + + if self.lsl_connection: + self.lsl_connection.push_sample(channel_data, timestamp=sample_time) + + self.update_sample_rate() + + if self.recording_active: + self.log_to_csv(channel_data) + except Exception as e: + print(f"\nUSB data handler error: {str(e)}") + break + + def connect_ble(self, device_address=None): + """ + Establishes and manages a Bluetooth Low Energy (BLE) connection with a device. + The method handles the complete BLE lifecycle including: + - Device discovery and selection (if no address provided) + - Connection establishment + - Data stream configuration + - Real-time data processing pipeline + Args: + device_address (str, optional): MAC address in "XX:XX:XX:XX:XX:XX" format. If None, initiates interactive device selection. + Returns: + bool: True if connection succeeds, False on failure + Workflow: Initialize BLE handler instance -> Configure custom data notification handler -> Establish connection (direct or interactive) -> Set up data processing pipeline -> Maintain connection until termination. + """ + # Initialize BLE protocol handler + self.ble_connection = Chords_BLE() + original_notification_handler = self.ble_connection.notification_handler + + def notification_handler(sender, data): + if len(data) == self.ble_connection.NEW_PACKET_LEN: + if not self.lsl_connection: + self.setup_lsl(num_channels=3, sampling_rate=500) + + original_notification_handler(sender, data) + + for i in range(0, self.ble_connection.NEW_PACKET_LEN, self.ble_connection.SINGLE_SAMPLE_LEN): + sample_data = data[i:i+self.ble_connection.SINGLE_SAMPLE_LEN] + if len(sample_data) == self.ble_connection.SINGLE_SAMPLE_LEN: + channels = [int.from_bytes(sample_data[i:i+2], byteorder='big', signed=True) + for i in range(1, len(sample_data), 2)] + self.last_sample = channels + self.ble_samples_received += 1 + + if self.lsl_connection: # Push to LSL + self.lsl_connection.push_sample(channels) + if self.recording_active: + self.log_to_csv(channels) + + self.ble_connection.notification_handler = notification_handler + + try: + if device_address: + print(f"Connecting to BLE device: {device_address}") + self.ble_connection.connect(device_address) + else: + selected_device = asyncio.run(self.get_ble_device()) + if not selected_device: + return False + print(f"Connecting to BLE device: {selected_device.name}") + self.ble_connection.connect(selected_device.address) + + self.running = True + self.ble_thread = threading.Thread(target=self.ble_data_handler) + self.ble_thread.daemon = True + self.ble_thread.start() + threading.Thread(target=self.lsl_rate_checker, daemon=True).start() # Start independent rate monitoring + return True + except Exception as e: + print(f"BLE connection failed: {str(e)}") + return False + + def connect_wifi(self): + """ + Manages WiFi connection and data streaming for CHORDS devices. + Implements a persistent connection loop that: + - Maintains websocket connection + - Validates incoming data blocks + - Handles data conversion and distribution + - Provides graceful shutdown on interrupt + The method runs continuously until KeyboardInterrupt, automatically cleaning up resources on exit. + """ + # Initialize WiFi handler and establish connection + self.wifi_connection = Chords_WIFI() + self.wifi_connection.connect() + + # Configure stream parameters from device + self.num_channels = self.wifi_connection.channels + sampling_rate = self.wifi_connection.sampling_rate + + # Initialize LSL stream if needed + if not self.lsl_connection: + self.setup_lsl(self.num_channels, sampling_rate) + + # Start the data handler thread + self.running = True + self.wifi_thread = threading.Thread(target=self.wifi_data_handler) + self.wifi_thread.daemon = True + self.wifi_thread.start() + threading.Thread(target=self.lsl_rate_checker, daemon=True).start() # Start independent rate monitoring + + try: + print("\nConnected! (Press Ctrl+C to stop)") + while True: + data = self.wifi_connection.ws.recv() # Receive data via websocket + + # Handle both binary and text-formatted data + if isinstance(data, (bytes, list)): + # Process data in protocol-defined blocks + block_size = self.wifi_connection.block_size + for i in range(0, len(data), block_size): + block = data[i:i + block_size] + + # Skip partial blocks + if len(block) < block_size: + continue + + # Extract and convert channel samples + channel_data = [] + for ch in range(self.wifi_connection.channels): + offset = 1 + ch * 2 # Calculate byte offset for each channel + sample = int.from_bytes(block[offset:offset + 2], byteorder='big', signed=True) + channel_data.append(sample) + + if self.lsl_connection: # Push to LSL + self.lsl_connection.push_sample(channel_data) + + # Record to CSV + if self.recording_active: + self.log_to_csv(channel_data) + + except KeyboardInterrupt: + print("\nDisconnected") + finally: + self.stop_csv_recording() # Ensure resources are released + + def connect_usb(self): + """ + Handles USB device connection and data streaming. + Implements: Automatic device detection, Hardware-specific configuration, Multi-threaded data handling, Rate monitoring. + Returns: + bool: True if successful initialization, False on failure + """ + # Initialize USB handler + self.usb_connection = Chords_USB() + # Detect and validate connected hardware + if not self.usb_connection.detect_hardware(): + return False + + # Configure stream based on detected board + self.num_channels = self.usb_connection.num_channels + board_config = self.usb_connection.supported_boards[self.usb_connection.board] + self.sampling_rate = board_config["sampling_rate"] + + # Initialize LSL stream + self.setup_lsl(self.num_channels, self.sampling_rate) + + # Start the USB streaming command + self.usb_connection.send_command('START') + + # Start the data handler thread + self.running = True + self.usb_thread = threading.Thread(target=self.usb_data_handler) + self.usb_thread.daemon = True + self.usb_thread.start() + + # Start independent rate monitoring + threading.Thread(target=self.lsl_rate_checker, daemon=True).start() + return True + + def cleanup(self): + """ + Clean up all resources and connections in a safe and orderly manner. + The cleanup process follows this sequence: First stop data recording -> Then stop LSL streaming -> Next terminate all threads -> Finally close all hardware connections. + """ + self.running = False # Signal all threads to stop + self.stop_csv_recording() # Stop CSV recording if active + + # Clean up LSL stream if active + if self.lsl_connection: + self.lsl_connection = None + self.stream_active = False + print("\nLSL stream stopped") + + # Collect all active threads + threads = [] + if self.usb_thread and self.usb_thread.is_alive(): + threads.append(self.usb_thread) + if self.ble_thread and self.ble_thread.is_alive(): + threads.append(self.ble_thread) + if self.wifi_thread and self.wifi_thread.is_alive(): + threads.append(self.wifi_thread) + + # Wait for threads to finish (with timeout to prevent hanging) + for t in threads: + t.join(timeout=1) # 1 second timeout per thread + + # Clean up USB connection + if self.usb_connection: + try: + # Check if serial port is open and send stop command + if hasattr(self.usb_connection, 'ser') and self.usb_connection.ser.is_open: + self.usb_connection.send_command('STOP') # Graceful stop + self.usb_connection.ser.close() # Close serial port + print("USB connection closed") + except Exception as e: + print(f"Error closing USB connection: {str(e)}") + finally: + self.usb_connection = None + + # Clean up BLE connection + if self.ble_connection: + try: + self.ble_connection.stop() # Stop BLE operations + print("BLE connection closed") + except Exception as e: + print(f"Error closing BLE connection: {str(e)}") + finally: + self.ble_connection = None + + # Clean up WiFi connection + if self.wifi_connection: + try: + self.wifi_connection.cleanup() # WiFi-specific cleanup + print("WiFi connection closed") + except Exception as e: + print(f"Error closing WiFi connection: {str(e)}") + finally: + self.wifi_connection = None + + # Reset all state flags + self.stream_active = False + self.recording_active = False + + def __del__(self): + """ + Destructor to ensure cleanup when object is garbage collected. It simply calls the main cleanup method. + """ + self.cleanup() + +def main(): + """ + Main entry point for command line execution of the CHORDS connection manager. + It handles: Command line argument parsing, protocol-specific connection setup, main execution loop, clean shutdown on exit. + + Usage Examples: + $ python chords_connection.py --protocol usb + $ python chords_connection.py --protocol wifi + $ python chords_connection.py --protocol ble + $ python chords_connection.py --protocol ble --ble-address AA:BB:CC:DD:EE:FF + + The main execution flow: + 1. Parse command line arguments + 2. Create connection manager instance + 3. Establish requested connection + 4. Enter main loop (until interrupted) + 5. Clean up resources on exit + """ + # Set up command line argument parser + parser = argparse.ArgumentParser(description='Connect to device') + parser.add_argument('--protocol', choices=['usb', 'wifi', 'ble'], required=True, help='Connection protocol to use (usb|wifi|ble)') + parser.add_argument('--ble-address', help='Direct BLE device address') + + args = parser.parse_args() # Parse command line arguments + manager = Connection() # Create connection manager instance + + try: + # USB Protocol Handling + if args.protocol == 'usb': + if manager.connect_usb(): # Attempt USB connection + while manager.running: # Main execution loop + time.sleep(1) # Prevent CPU overutilization + + # WiFi Protocol Handling + elif args.protocol == 'wifi': + if manager.connect_wifi(): # Attempt WiFi connection + while manager.running: # Main execution loop + time.sleep(1) # Prevent CPU overutilization + + # BLE Protocol Handling + elif args.protocol == 'ble': + if manager.connect_ble(args.ble_address): # Attempt BLE connection + while manager.running: # Main execution loop + time.sleep(1) # Prevent CPU overutilization + + except KeyboardInterrupt: + print("\nCleanup Completed.") + except Exception as e: + print(f"\nError: {str(e)}") + finally: + manager.cleanup() # Ensure cleanup always runs + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/csvplotter.py b/chordspy/csvplotter.py similarity index 97% rename from csvplotter.py rename to chordspy/csvplotter.py index a280faf3..ba8ab566 100644 --- a/csvplotter.py +++ b/chordspy/csvplotter.py @@ -99,7 +99,10 @@ def plot_data(self): ) fig.show() # Display the plot in a new window -if __name__ == "__main__": +def main(): root = tk.Tk() # Create the main Tkinter root window - app = CSVPlotterApp(root) # Create an instance of the CSVPlotterApp class - root.mainloop() # Start the Tkinter main loop \ No newline at end of file + CSVPlotterApp(root) # Create an instance of the CSVPlotterApp class + root.mainloop() # Start the Tkinter main loop + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/emgenvelope.py b/chordspy/emgenvelope.py similarity index 98% rename from emgenvelope.py rename to chordspy/emgenvelope.py index 78139a13..b9e0aeac 100644 --- a/emgenvelope.py +++ b/chordspy/emgenvelope.py @@ -135,9 +135,12 @@ def update_plot(self): print("LSL stream disconnected!") self.timer.stop() self.close() - -if __name__ == "__main__": + +def main(): app = QApplication(sys.argv) window = EMGMonitor() window.show() - sys.exit(app.exec_()) \ No newline at end of file + sys.exit(app.exec_()) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/eog.py b/chordspy/eog.py similarity index 99% rename from eog.py rename to chordspy/eog.py index e8de7cb8..44a7380b 100644 --- a/eog.py +++ b/chordspy/eog.py @@ -183,9 +183,12 @@ def detect_peaks(self, signal, threshold): return peaks -if __name__ == "__main__": +def main(): app = QApplication(sys.argv) window = EOGMonitor() print("Note: There will be a 2s calibration delay before peak detection starts.") window.show() - sys.exit(app.exec_()) \ No newline at end of file + sys.exit(app.exec_()) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/ffteeg.py b/chordspy/ffteeg.py similarity index 99% rename from ffteeg.py rename to chordspy/ffteeg.py index a46991a4..cbcdb367 100644 --- a/ffteeg.py +++ b/chordspy/ffteeg.py @@ -418,8 +418,11 @@ def update_brainpower_plot(self): relative_powers = [band_powers['delta'], band_powers['theta'], band_powers['alpha'], band_powers['beta'], band_powers['gamma']] self.brainwave_bars.setOpts(height=relative_powers) -if __name__ == "__main__": +def main(): app = QApplication(sys.argv) window = EEGMonitor() window.show() - sys.exit(app.exec_()) \ No newline at end of file + sys.exit(app.exec_()) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/game.py b/chordspy/game.py similarity index 99% rename from game.py rename to chordspy/game.py index eddc9444..09fe0450 100644 --- a/game.py +++ b/chordspy/game.py @@ -129,7 +129,7 @@ def eeg_data_thread(eeg_queue): continue try: sample, timestamp = inlet.pull_sample() - if len(sample) >= 6: + if len(sample) >= 3: channel1_data = sample[0] # PLAYER A channel2_data = sample[1] # PLAYER B diff --git a/gui.py b/chordspy/gui.py similarity index 98% rename from gui.py rename to chordspy/gui.py index 00f9d78d..bc79daa3 100644 --- a/gui.py +++ b/chordspy/gui.py @@ -122,7 +122,10 @@ def init_gui(): return app -if __name__ == "__main__": +def main(): plot_lsl_data() if inlet: - sys.exit(app.exec_()) # Start the Qt application only if a stream was connected \ No newline at end of file + sys.exit(app.exec_()) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/heartbeat_ecg.py b/chordspy/heartbeat_ecg.py similarity index 99% rename from heartbeat_ecg.py rename to chordspy/heartbeat_ecg.py index aa10fcf8..4dda0bfc 100644 --- a/heartbeat_ecg.py +++ b/chordspy/heartbeat_ecg.py @@ -160,8 +160,11 @@ def plot_r_peaks(self, filtered_ecg): r_peak_values = filtered_ecg[self.r_peaks] # Get corresponding ECG values self.r_peak_curve.setData(r_peak_times, r_peak_values) # Plot R-peaks as red dots -if __name__ == "__main__": +def main(): app = QApplication(sys.argv) window = ECGMonitor() window.show() - sys.exit(app.exec_()) \ No newline at end of file + sys.exit(app.exec_()) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/keystroke.py b/chordspy/keystroke.py similarity index 98% rename from keystroke.py rename to chordspy/keystroke.py index 4566cf1f..c3d49026 100644 --- a/keystroke.py +++ b/chordspy/keystroke.py @@ -203,7 +203,7 @@ def move(event): horizontal_frame = tk.Frame(popup) horizontal_frame.pack(expand=True, pady=10) - eye_icon = PhotoImage(file="media\\icons8-eye-30.png") + eye_icon = PhotoImage(file="C:/Users/PAYAL/Desktop/Chords-Python/chordspy/media/icons8-eye-30.png") blink_button = tk.Button(horizontal_frame, image=eye_icon, width=70, height=38, bg="#FFFFFF") blink_button.image = eye_icon diff --git a/media/Beetle1.png b/chordspy/media/Beetle1.png similarity index 100% rename from media/Beetle1.png rename to chordspy/media/Beetle1.png diff --git a/media/Beetle10.png b/chordspy/media/Beetle10.png similarity index 100% rename from media/Beetle10.png rename to chordspy/media/Beetle10.png diff --git a/media/Beetle2.png b/chordspy/media/Beetle2.png similarity index 100% rename from media/Beetle2.png rename to chordspy/media/Beetle2.png diff --git a/media/Beetle3.png b/chordspy/media/Beetle3.png similarity index 100% rename from media/Beetle3.png rename to chordspy/media/Beetle3.png diff --git a/media/Beetle4.png b/chordspy/media/Beetle4.png similarity index 100% rename from media/Beetle4.png rename to chordspy/media/Beetle4.png diff --git a/media/Beetle5.png b/chordspy/media/Beetle5.png similarity index 100% rename from media/Beetle5.png rename to chordspy/media/Beetle5.png diff --git a/media/Beetle6.png b/chordspy/media/Beetle6.png similarity index 100% rename from media/Beetle6.png rename to chordspy/media/Beetle6.png diff --git a/media/Beetle7.png b/chordspy/media/Beetle7.png similarity index 100% rename from media/Beetle7.png rename to chordspy/media/Beetle7.png diff --git a/media/Beetle8.png b/chordspy/media/Beetle8.png similarity index 100% rename from media/Beetle8.png rename to chordspy/media/Beetle8.png diff --git a/media/Beetle9.png b/chordspy/media/Beetle9.png similarity index 100% rename from media/Beetle9.png rename to chordspy/media/Beetle9.png diff --git a/media/Interface.png b/chordspy/media/Interface.png similarity index 100% rename from media/Interface.png rename to chordspy/media/Interface.png diff --git a/media/Webinterface.png b/chordspy/media/Webinterface.png similarity index 100% rename from media/Webinterface.png rename to chordspy/media/Webinterface.png diff --git a/media/brass-fanfare-with-timpani-and-winchimes-reverberated-146260.wav b/chordspy/media/brass-fanfare-with-timpani-and-winchimes-reverberated-146260.wav similarity index 100% rename from media/brass-fanfare-with-timpani-and-winchimes-reverberated-146260.wav rename to chordspy/media/brass-fanfare-with-timpani-and-winchimes-reverberated-146260.wav diff --git a/media/icons8-eye-30.png b/chordspy/media/icons8-eye-30.png similarity index 100% rename from media/icons8-eye-30.png rename to chordspy/media/icons8-eye-30.png diff --git a/Notebooks/ecg.ipynb b/chordspy/notebooks/ecg.ipynb similarity index 100% rename from Notebooks/ecg.ipynb rename to chordspy/notebooks/ecg.ipynb diff --git a/Notebooks/emg.ipynb b/chordspy/notebooks/emg.ipynb similarity index 100% rename from Notebooks/emg.ipynb rename to chordspy/notebooks/emg.ipynb diff --git a/Notebooks/eog.ipynb b/chordspy/notebooks/eog.ipynb similarity index 100% rename from Notebooks/eog.ipynb rename to chordspy/notebooks/eog.ipynb diff --git a/static/script.js b/chordspy/static/script.js similarity index 96% rename from static/script.js rename to chordspy/static/script.js index ed1684e4..6a340615 100644 --- a/static/script.js +++ b/chordspy/static/script.js @@ -112,19 +112,42 @@ function renderApps(apps) {

${app.description}

+ `; - updateAppStatus(app.script); - card.addEventListener('click', async () => { - await handleAppClick(app, card); - }); - appGrid.appendChild(card); }); } +async function launchApp(appScript) { + try { + const response = await fetch('/launch_app', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ app: appScript }) + }); + + const result = await response.json(); + + if (result.status === 'error') { + if (result.code === 'ALREADY_RUNNING') { + alert(`Application is already running.`); + } else { + alert(`Failed to launch application: ${result.message}`); + } + } else { + console.log(`Launched ${appScript}`); + } + } catch (error) { + logError('Error launching app:', error); + } +} + async function handleAppClick(app, card) { const statusElement = document.getElementById(`status-${app.script}`); if (statusElement && !statusElement.classList.contains('hidden')) { @@ -926,8 +949,13 @@ checkStreamStatus(); // Initialize the app when DOM is loaded document.addEventListener('DOMContentLoaded', () => { -initializeApplication(); -window.onerror = function(message, source, lineno, colno, error) { + initializeApplication(); + checkStreamStatus(); + setInterval(checkStreamStatus, 1000); + startTimestampUpdater(); + + // Error handling + window.onerror = function(message, source, lineno, colno, error) { logError(error || message); return true; }; @@ -936,6 +964,6 @@ document.getElementById('github-btn').addEventListener('click', () => { }); document.getElementById('info-btn').addEventListener('click', () => { - alert('Chords Python - Biopotential Data Acquisition System\nVersion 2.1.0'); + alert('Chords Python - Bio-potential Data Acquisition System\nVersion 0.1.0'); }); }); \ No newline at end of file diff --git a/templates/index.html b/chordspy/templates/index.html similarity index 77% rename from templates/index.html rename to chordspy/templates/index.html index eaf842a1..ee79e77b 100644 --- a/templates/index.html +++ b/chordspy/templates/index.html @@ -1,25 +1,39 @@ + + Chords Python + + + - +
+ Chords Python +
+ + + @@ -27,23 +41,30 @@
- +
+
+ + +
+ @@ -57,8 +78,10 @@ Disconnecting... + +
+ +
+
- +