#!/usr/bin/env python3 # -*- coding:utf-8 -*- """ Virtual Terminal for E-Ink Display Spawns a virtual terminal, displays preview in console, and writes to e-ink display """ import sys import os import time import threading import pty import select import subprocess import shutil import contextlib import glob from PIL import Image, ImageDraw, ImageFont, ImageOps try: from evdev import InputDevice, categorize, ecodes, list_devices HAS_EVDEV = True except ImportError: HAS_EVDEV = False # Add the waveshare library path libdir = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'e-Paper/RaspberryPi_JetsonNano/python/lib') if os.path.exists(libdir): sys.path.append(libdir) try: import pyte except ImportError: print("Error: pyte library not found. Install with: pip3 install pyte") sys.exit(1) # Configuration for different display types DISPLAY_CONFIGS = { 'epd2in13_V4': { 'width': 122, 'height': 250, 'module': 'epd2in13_V4', 'char_width': 6, # Width of each character in pixels 'char_height': 12, # Height of each character in pixels }, 'epd2in7': { 'width': 176, 'height': 264, 'module': 'epd2in7', 'char_width': 6, 'char_height': 12, }, 'epd4in2': { 'width': 400, 'height': 300, 'module': 'epd4in2', 'char_width': 7, 'char_height': 14, 'font_size': 12, }, 'epd7in5_V2': { 'width': 800, 'height': 480, 'module': 'epd7in5_V2', 'char_width': 8, 'char_height': 16, }, } class VirtualTerminalEInk: def __init__(self, display_type='epd2in13_V4', use_display=True, zoom=1.0, show_preview=True): """ Initialize the virtual terminal for e-ink display Args: display_type: Type of e-ink display (see DISPLAY_CONFIGS) use_display: Whether to actually use the e-ink display (False for testing) zoom: Zoom multiplier for rendering (1.0 = normal, 2.0 = 2x larger text, etc.) show_preview: Whether to show ASCII preview in console output """ self.display_type = display_type self.use_display = use_display self.zoom = max(1.0, float(zoom)) # Ensure zoom is at least 1.0 self.show_preview = show_preview self.config = DISPLAY_CONFIGS.get(display_type) # Serialize console output (preview redraw + status messages) so it doesn't # get interleaved with background threads or noisy hardware drivers. self._console_lock = threading.RLock() if not self.config: raise ValueError(f"Unknown display type: {display_type}") # Load font and measure actual character dimensions self.font = self._load_font() self._measure_font_metrics() # Calculate terminal size in characters (accounting for zoom) # With zoom, we render to a smaller virtual terminal and scale it up self.cols = int(self.config['width'] / (self.char_width * self.zoom)) self.rows = int(self.config['height'] / (self.char_height * self.zoom)) self._safe_print(f"Display: {display_type} ({self.config['width']}x{self.config['height']})") self._safe_print(f"Font metrics: {self.char_width}x{self.char_height} pixels per character") self._safe_print(f"Terminal size: {self.cols}x{self.rows} characters") # Initialize pyte screen self.screen = pyte.Screen(self.cols, self.rows) self.stream = pyte.Stream(self.screen) # Track screen deltas for partial refresh. self._last_snapshot = None self._last_cursor = None self._partial_ready = False self._partial_enabled = True # Debounce display updates to batch rapid changes (typing) self._update_debounce_ms = 60 # milliseconds to wait before refreshing self._pending_update = False self._update_timer = None self._update_lock = threading.Lock() # Quadrant zoom feature (F5-F8 keys) self._quadrant_view = None # None, 'top_left', 'top_right', 'bottom_left', 'bottom_right' self._quadrant_changed = False # Flag to force refresh when quadrant changes # Initialize e-ink display self.epd = None if self.use_display: try: module = __import__(f'waveshare_epd.{self.config["module"]}', fromlist=[self.config['module']]) self.epd = module.EPD() self._safe_print("Initializing e-ink display...") with self._silence_console(): self.epd.init() # Call Clear without arguments for displays that don't take parameters try: self.epd.Clear(0xFF) except TypeError: self.epd.Clear() self._safe_print("E-ink display ready!") # If supported, switch the panel into partial-update mode so we can # update only the changed region (e.g., per typed character). self._prepare_partial_mode_if_supported() except Exception as e: self._safe_print(f"Warning: Could not initialize e-ink display: {e}") self._safe_print("Running in preview-only mode") self.use_display = False # Terminal process self.master_fd = None self.running = False # Direct keyboard input (headless mode) self.keyboard_device = None self.keyboard_path = None self.keyboard_reconnect_interval = 2.0 # seconds def _safe_print(self, *args, **kwargs): # When the terminal is put into raw-ish modes (see read_user_input), some # terminals stop translating '\n' into CRLF. Using CRLF avoids "stair-step" # indentation in the preview. with self._console_lock: end = kwargs.pop('end', '\r\n') flush = kwargs.pop('flush', True) print(*args, end=end, flush=flush, **kwargs) @contextlib.contextmanager def _silence_console(self): """Temporarily silence stdout/stderr (e.g., Waveshare driver prints).""" with open(os.devnull, 'w') as devnull: with contextlib.redirect_stdout(devnull), contextlib.redirect_stderr(devnull): yield def _epd_supports_partial_window(self): """Return True if the active driver exposes a windowed partial refresh API.""" if not self.epd: return False # epd4in2 exposes a dedicated region updater. if hasattr(self.epd, 'EPD_4IN2_PartialDisplay'): return True # Many other drivers expose display_Partial(Image, Xstart, Ystart, Xend, Yend) if hasattr(self.epd, 'display_Partial'): return True return False def _prepare_partial_mode_if_supported(self): if not (self.use_display and self.epd and self._partial_enabled): return if not self._epd_supports_partial_window(): return # Some drivers require switching LUT/mode via init_Partial. if hasattr(self.epd, 'init_Partial'): try: with self._silence_console(): self.epd.init_Partial() self._partial_ready = True except Exception: self._partial_ready = False def _snapshot_screen(self): """Return a compact snapshot of the current pyte screen state.""" snapshot = [] buf = self.screen.buffer for row in range(self.rows): row_cells = [] line = buf[row] for col in range(self.cols): ch = line[col] row_cells.append((ch.data, bool(getattr(ch, 'reverse', False)))) snapshot.append(row_cells) return snapshot def _compute_dirty_cell_bbox(self, prev_snapshot, curr_snapshot, prev_cursor, curr_cursor): """Compute bounding box of changed cells (inclusive). Returns (min_c, min_r, max_c, max_r) or None.""" if prev_snapshot is None: return (0, 0, self.cols - 1, self.rows - 1) min_c = None min_r = None max_c = None max_r = None for r in range(self.rows): prev_row = prev_snapshot[r] curr_row = curr_snapshot[r] for c in range(self.cols): if prev_row[c] != curr_row[c]: if min_c is None: min_c = max_c = c min_r = max_r = r else: min_c = min(min_c, c) max_c = max(max_c, c) min_r = min(min_r, r) max_r = max(max_r, r) # Always refresh old + new cursor cells so underline moves cleanly. for cur in (prev_cursor, curr_cursor): if not cur: continue c, r = cur if 0 <= c < self.cols and 0 <= r < self.rows: if min_c is None: min_c = max_c = c min_r = max_r = r else: min_c = min(min_c, c) max_c = max(max_c, c) min_r = min(min_r, r) max_r = max(max_r, r) if min_c is None: return None return (min_c, min_r, max_c, max_r) def _cell_bbox_to_pixel_window(self, cell_bbox): """Convert cell bbox to pixel window (x_start, y_start, x_end, y_end), end-exclusive.""" min_c, min_r, max_c, max_r = cell_bbox x_start = min_c * self.char_width y_start = min_r * self.char_height x_end = (max_c + 1) * self.char_width y_end = (max_r + 1) * self.char_height # Expand Y slightly to ensure cursor underline and descenders are fully captured y_end = min(y_end + 2, self.config['height']) # Clamp to display bounds. x_start = max(0, min(x_start, self.config['width'])) y_start = max(0, min(y_start, self.config['height'])) x_end = max(0, min(x_end, self.config['width'])) y_end = max(0, min(y_end, self.config['height'])) # Align X to byte boundaries (8px) for most controllers. x_start = (x_start // 8) * 8 x_end = ((x_end + 7) // 8) * 8 x_end = max(x_start + 8, min(x_end, self.config['width'])) # Ensure non-empty window. if y_end <= y_start: y_end = min(y_start + self.char_height, self.config['height']) return (x_start, y_start, x_end, y_end) def _find_keyboard_device(self): """Find a keyboard input device, preferring Bluetooth keyboards.""" if not HAS_EVDEV: return None try: devices = [InputDevice(path) for path in list_devices()] keyboards = [] for device in devices: # Check if device has keyboard capabilities caps = device.capabilities() if ecodes.EV_KEY in caps: # Check for typical keyboard keys keys = caps[ecodes.EV_KEY] has_letters = any(k in keys for k in range(ecodes.KEY_Q, ecodes.KEY_P + 1)) if has_letters: # Prefer Bluetooth keyboards is_bluetooth = 'bluetooth' in device.name.lower() or 'bt' in device.name.lower() keyboards.append((device, is_bluetooth)) if not keyboards: return None # Sort by Bluetooth preference keyboards.sort(key=lambda x: x[1], reverse=True) selected = keyboards[0][0] self._safe_print(f"Found keyboard: {selected.name} ({selected.path})") return selected except Exception as e: self._safe_print(f"Error finding keyboard: {e}") return None def _epd_sync_partial_base_from_buffer(self, buffer_bytes): """Keep driver-side 'old image' buffer in sync for partial updates.""" if not self.epd or not hasattr(self.epd, 'DATA'): return try: # The driver inverts the buffer itself during partial refresh, # so we store it as-is (not inverted). self.epd.DATA = list(buffer_bytes) except Exception: pass def _load_font(self): """Load a monospace font""" font_size = self.config.get('font_size', self.config['char_height'] - 2) # Try to use a monospace font, fall back to default if not available for font_path in [ '/usr/share/fonts/truetype/dejavu/DejaVuSansMono.ttf', '/usr/share/fonts/truetype/liberation/LiberationMono-Regular.ttf', '/usr/share/fonts/truetype/freefont/FreeMono.ttf', ]: try: return ImageFont.truetype(font_path, font_size) except: continue return ImageFont.load_default() def _measure_font_metrics(self): """Measure actual character dimensions from the loaded font""" # Create a temporary image to measure text temp_img = Image.new('1', (200, 100), 255) draw = ImageDraw.Draw(temp_img) # Measure multiple characters to get accurate width test_str = "MMMMMMMMMM" # 10 M's bbox = draw.textbbox((0, 0), test_str, font=self.font) avg_width = (bbox[2] - bbox[0]) / 10 # Measure height bbox_height = draw.textbbox((0, 0), 'M', font=self.font) height = bbox_height[3] - bbox_height[1] # Round up to ensure characters don't overlap self.char_width = int(avg_width) + 1 self.char_height = int(height) + 2 # Ensure minimum dimensions if self.char_width < 6: self.char_width = 6 if self.char_height < 10: self.char_height = 10 self._safe_print(f"DEBUG: Measured char dimensions - width: {self.char_width}, height: {self.char_height}") def render_to_image(self): """Render the virtual terminal screen to a PIL Image""" # Create a white image image = Image.new('1', (self.config['width'], self.config['height']), 255) draw = ImageDraw.Draw(image) # Draw each character for row in range(self.rows): for col in range(self.cols): char = self.screen.buffer[row][col] if char.data != ' ': x = col * self.char_width y = row * self.char_height # Draw character (0 = black, 255 = white) if char.reverse: # Inverted colors for reverse video draw.rectangle([(x, y), (x + self.char_width, y + self.char_height)], fill=0) draw.text((x, y), char.data, font=self.font, fill=255) else: draw.text((x, y), char.data, font=self.font, fill=0) # Draw cursor cursor_row = self.screen.cursor.y cursor_col = self.screen.cursor.x cursor_x = cursor_col * self.char_width cursor_y = cursor_row * self.char_height draw.rectangle([(cursor_x, cursor_y + self.char_height - 2), (cursor_x + self.char_width, cursor_y + self.char_height)], fill=0) return image def display_preview(self): """Display ASCII preview of the terminal in the console""" with self._console_lock: term_size = shutil.get_terminal_size(fallback=(80, 24)) term_cols = max(20, term_size.columns) term_lines = max(10, term_size.lines) # Keep a small bottom area for status lines. max_visible_rows = max(1, term_lines - 4) visible_rows = min(self.rows, max_visible_rows) # Clamp preview width to terminal width so the border never wraps. max_visible_cols = max(10, term_cols - 2) visible_cols = min(self.cols, max_visible_cols) cursor_col = self.screen.cursor.x cursor_row = self.screen.cursor.y # If the virtual terminal is wider than our real terminal, horizontally scroll # so the cursor stays visible. if visible_cols < self.cols: start_col = cursor_col - (visible_cols // 2) start_col = max(0, min(start_col, self.cols - visible_cols)) else: start_col = 0 # Clear screen and redraw in-place. sys.stdout.write("\x1b[2J\x1b[H") border = "=" * (visible_cols + 2) sys.stdout.write(border + "\r\n") for row in range(visible_rows): # Some pyte Line implementations support slicing; if not, fall back. try: row_slice = self.screen.buffer[row][start_col:start_col + visible_cols] line = "".join(ch.data for ch in row_slice) except Exception: line_chars = [] for col in range(start_col, start_col + visible_cols): line_chars.append(self.screen.buffer[row][col].data) line = "".join(line_chars) # Pad to keep the right border aligned. if len(line) < visible_cols: line = line.ljust(visible_cols) sys.stdout.write("|" + line + "|\r\n") sys.stdout.write(border + "\r\n") if start_col: sys.stdout.write( f"Cursor at: ({cursor_col}, {cursor_row}) view: cols {start_col}-{start_col + visible_cols - 1}\r\n" ) else: sys.stdout.write(f"Cursor at: ({cursor_col}, {cursor_row})\r\n") sys.stdout.flush() def _do_update_display(self): """Internal: Actually perform the display update (called after debounce).""" with self._console_lock: # Display preview in console (if enabled) if self.show_preview: self.display_preview() # Update e-ink display (silence driver prints to avoid tearing the preview) if self.use_display and self.epd: try: curr_snapshot = self._snapshot_screen() curr_cursor = (self.screen.cursor.x, self.screen.cursor.y) dirty_cells = self._compute_dirty_cell_bbox( self._last_snapshot, curr_snapshot, self._last_cursor, curr_cursor, ) # Force update if quadrant view changed (even if screen didn't change) if self._quadrant_changed: self._safe_print(f"[DEBUG] Forcing display update due to quadrant view change") dirty_cells = (0, 0, self.cols - 1, self.rows - 1) # Force full refresh self._quadrant_changed = False # Reset flag # Nothing changed -> skip panel update. if dirty_cells is None: self._last_snapshot = curr_snapshot self._last_cursor = curr_cursor return # Heuristic: large changes -> full refresh. min_c, min_r, max_c, max_r = dirty_cells changed_area = (max_c - min_c + 1) * (max_r - min_r + 1) total_area = self.cols * self.rows do_full = (self._last_snapshot is None) or (changed_area > total_area * 0.35) image = self.render_to_image() # Apply quadrant view if active (before zoom) if self._quadrant_view is not None: self._safe_print(f"[DEBUG] Applying quadrant view: {self._quadrant_view}") # Crop to the selected quadrant half_width = image.width // 2 half_height = image.height // 2 if self._quadrant_view == 'top_left': image = image.crop((0, 0, half_width, half_height)) elif self._quadrant_view == 'top_right': image = image.crop((half_width, 0, image.width, half_height)) elif self._quadrant_view == 'bottom_left': image = image.crop((0, half_height, half_width, image.height)) elif self._quadrant_view == 'bottom_right': image = image.crop((half_width, half_height, image.width, image.height)) self._safe_print(f"[DEBUG] Cropped to size: {image.size}, scaling to display: {self.config['width']}x{self.config['height']}") # Scale the quadrant to full display size image = image.resize((self.config['width'], self.config['height']), Image.NEAREST) # Apply zoom if needed (independent of quadrant view) if self.zoom > 1.0: new_width = int(image.width * self.zoom) new_height = int(image.height * self.zoom) # Use NEAREST for crisp text scaling image = image.resize((new_width, new_height), Image.NEAREST) # Crop to display size if zoomed image is larger if new_width > self.config['width'] or new_height > self.config['height']: image = image.crop((0, 0, self.config['width'], self.config['height'])) # Prefer partial window update when supported. if (not do_full) and self._partial_enabled and self._epd_supports_partial_window(): if not self._partial_ready: self._prepare_partial_mode_if_supported() x_start, y_start, x_end, y_end = self._cell_bbox_to_pixel_window(dirty_cells) # Partial refresh: EPD_4IN2_PartialDisplay inverts internally, so use raw buffer buffer_bytes = self.epd.getbuffer(image) with self._silence_console(): if hasattr(self.epd, 'EPD_4IN2_PartialDisplay'): # epd4in2 expects full-frame buffer and a pixel window. # The driver inverts and stores in DATA internally, so we don't sync here. self.epd.EPD_4IN2_PartialDisplay(x_start, y_start, x_end, y_end, buffer_bytes) elif hasattr(self.epd, 'display_Partial'): # Many drivers accept a PIL Image + window. If this driver # has a different signature, it will raise and we fall back. try: self.epd.display_Partial(image, x_start, y_start, x_end, y_end) except TypeError: # Some implementations only accept a single image. self.epd.display_Partial(image) # Sync for other drivers that may not maintain DATA self._epd_sync_partial_base_from_buffer(buffer_bytes) else: # Full refresh: display() expects inverted polarity image_inverted = ImageOps.invert(image.convert('L')).convert('1') buffer_bytes = self.epd.getbuffer(image_inverted) with self._silence_console(): self.epd.display(buffer_bytes) self._epd_sync_partial_base_from_buffer(buffer_bytes) # Re-enter partial mode after a full refresh if the driver supports it. self._partial_ready = False self._prepare_partial_mode_if_supported() self._last_snapshot = curr_snapshot self._last_cursor = curr_cursor except Exception as e: self._safe_print(f"Error updating e-ink display: {e}") def update_display(self): """Schedule a debounced display update to batch rapid changes together.""" with self._update_lock: # Cancel any pending timer if self._update_timer is not None: self._update_timer.cancel() # Schedule new update after debounce period self._update_timer = threading.Timer( self._update_debounce_ms / 1000.0, self._do_update_display ) self._update_timer.daemon = True self._update_timer.start() def read_terminal_output(self): """Read output from the terminal and feed it to pyte""" while self.running: # Check if there's data to read r, w, e = select.select([self.master_fd], [], [], 0.1) if self.master_fd in r: try: data = os.read(self.master_fd, 1024) if data: # Feed data to pyte stream self.stream.feed(data.decode('utf-8', errors='replace')) # Update display self.update_display() except OSError: break def read_user_input(self): """Read user input directly from keyboard device with auto-reconnection.""" if not HAS_EVDEV: self._safe_print("Warning: evdev not available, install with: pip3 install evdev") return # Proper QWERTY keyboard layout mapping based on actual evdev keycodes key_map = { # Special keys ecodes.KEY_ENTER: '\r', # Send carriage return (terminal driver handles conversion) ecodes.KEY_TAB: '\t', ecodes.KEY_SPACE: ' ', ecodes.KEY_BACKSPACE: '\x7f', ecodes.KEY_ESC: '\x1b', ecodes.KEY_MINUS: '-', ecodes.KEY_EQUAL: '=', ecodes.KEY_LEFTBRACE: '[', ecodes.KEY_RIGHTBRACE: ']', ecodes.KEY_SEMICOLON: ';', ecodes.KEY_APOSTROPHE: "'", ecodes.KEY_GRAVE: '`', ecodes.KEY_BACKSLASH: '\\', ecodes.KEY_COMMA: ',', ecodes.KEY_DOT: '.', ecodes.KEY_SLASH: '/', # Arrow keys (ANSI escape sequences for terminal navigation) ecodes.KEY_UP: '\x1b[A', ecodes.KEY_DOWN: '\x1b[B', ecodes.KEY_RIGHT: '\x1b[C', ecodes.KEY_LEFT: '\x1b[D', # Function keys (for quadrant zoom) ecodes.KEY_F5: '', ecodes.KEY_F6: '', ecodes.KEY_F7: '', ecodes.KEY_F8: '', # Number row (unshifted) ecodes.KEY_1: '1', ecodes.KEY_2: '2', ecodes.KEY_3: '3', ecodes.KEY_4: '4', ecodes.KEY_5: '5', ecodes.KEY_6: '6', ecodes.KEY_7: '7', ecodes.KEY_8: '8', ecodes.KEY_9: '9', ecodes.KEY_0: '0', # Letter keys (lowercase) ecodes.KEY_Q: 'q', ecodes.KEY_W: 'w', ecodes.KEY_E: 'e', ecodes.KEY_R: 'r', ecodes.KEY_T: 't', ecodes.KEY_Y: 'y', ecodes.KEY_U: 'u', ecodes.KEY_I: 'i', ecodes.KEY_O: 'o', ecodes.KEY_P: 'p', ecodes.KEY_A: 'a', ecodes.KEY_S: 's', ecodes.KEY_D: 'd', ecodes.KEY_F: 'f', ecodes.KEY_G: 'g', ecodes.KEY_H: 'h', ecodes.KEY_J: 'j', ecodes.KEY_K: 'k', ecodes.KEY_L: 'l', ecodes.KEY_Z: 'z', ecodes.KEY_X: 'x', ecodes.KEY_C: 'c', ecodes.KEY_V: 'v', ecodes.KEY_B: 'b', ecodes.KEY_N: 'n', ecodes.KEY_M: 'm', } # Shifted character mappings shift_map = { '1': '!', '2': '@', '3': '#', '4': '$', '5': '%', '6': '^', '7': '&', '8': '*', '9': '(', '0': ')', '-': '_', '=': '+', '[': '{', ']': '}', '\\': '|', ';': ':', "'": '"', '`': '~', ',': '<', '.': '>', '/': '?', } shift_pressed = False ctrl_pressed = False last_reconnect_attempt = 0 while self.running: # Try to connect/reconnect to keyboard if self.keyboard_device is None: current_time = time.time() if current_time - last_reconnect_attempt >= self.keyboard_reconnect_interval: last_reconnect_attempt = current_time self.keyboard_device = self._find_keyboard_device() if self.keyboard_device: self.keyboard_path = self.keyboard_device.path # Display connection status on screen self.stream.feed(f"\r\n[Keyboard connected: {self.keyboard_device.name}]\r\n") self.update_display() else: time.sleep(0.5) continue try: # Check device is still valid before select if self.keyboard_device is None: time.sleep(0.1) continue # Read keyboard events with timeout r, w, e = select.select([self.keyboard_device.fd], [], [], 0.1) if not r: continue for event in self.keyboard_device.read(): if event.type == ecodes.EV_KEY: key_event = categorize(event) # Debug: show all key events if event.value == 1: # Key press key_name = ecodes.KEY[event.code] if event.code in ecodes.KEY else f"UNKNOWN({event.code})" self._safe_print(f"[DEBUG] Key pressed: {key_name} (code: {event.code})") # Track modifier keys if event.code in (ecodes.KEY_LEFTSHIFT, ecodes.KEY_RIGHTSHIFT): shift_pressed = (event.value == 1) continue elif event.code in (ecodes.KEY_LEFTCTRL, ecodes.KEY_RIGHTCTRL): ctrl_pressed = (event.value == 1) continue # Only process key press (not release) if event.value != 1: continue # Handle Ctrl+letter combinations if ctrl_pressed: # Map letter keys to Ctrl codes (Ctrl+A = 1, Ctrl+B = 2, ..., Ctrl+Z = 26) ctrl_letter_map = { ecodes.KEY_A: 1, ecodes.KEY_B: 2, ecodes.KEY_C: 3, ecodes.KEY_D: 4, ecodes.KEY_E: 5, ecodes.KEY_F: 6, ecodes.KEY_G: 7, ecodes.KEY_H: 8, ecodes.KEY_I: 9, ecodes.KEY_J: 10, ecodes.KEY_K: 11, ecodes.KEY_L: 12, ecodes.KEY_M: 13, ecodes.KEY_N: 14, ecodes.KEY_O: 15, ecodes.KEY_P: 16, ecodes.KEY_Q: 17, ecodes.KEY_R: 18, ecodes.KEY_S: 19, ecodes.KEY_T: 20, ecodes.KEY_U: 21, ecodes.KEY_V: 22, ecodes.KEY_W: 23, ecodes.KEY_X: 24, ecodes.KEY_Y: 25, ecodes.KEY_Z: 26, } if event.code in ctrl_letter_map: ctrl_char = chr(ctrl_letter_map[event.code]) os.write(self.master_fd, ctrl_char.encode()) # Ctrl+D exits the terminal if event.code == ecodes.KEY_D: self.stream.feed("\r\n[Exiting...]\r\n") self.update_display() self.running = False break continue # Handle F5-F8 for quadrant zoom if event.code == ecodes.KEY_F5: self._safe_print(f"[DEBUG] F5 pressed - zooming to top-left quadrant") self._quadrant_view = 'top_left' self._quadrant_changed = True self.update_display() continue elif event.code == ecodes.KEY_F6: self._safe_print(f"[DEBUG] F6 pressed - zooming to top-right quadrant") self._quadrant_view = 'top_right' self._quadrant_changed = True self.update_display() continue elif event.code == ecodes.KEY_F7: self._safe_print(f"[DEBUG] F7 pressed - zooming to bottom-left quadrant") self._quadrant_view = 'bottom_left' self._quadrant_changed = True self.update_display() continue elif event.code == ecodes.KEY_F8: self._safe_print(f"[DEBUG] F8 pressed - zooming to bottom-right quadrant") self._quadrant_view = 'bottom_right' self._quadrant_changed = True self.update_display() continue # Map key to character char = key_map.get(event.code) if char: # Reset quadrant view on any normal keypress if self._quadrant_view is not None: self._safe_print(f"[DEBUG] Resetting quadrant view to full screen") self._quadrant_view = None self._quadrant_changed = True self.update_display() # Apply shift modifier (ctrl is handled separately above) if shift_pressed and not ctrl_pressed: if char.isalpha(): char = char.upper() elif char in shift_map: char = shift_map[char] # Send to PTY os.write(self.master_fd, char.encode()) except (OSError, IOError) as e: # Keyboard disconnected if self.keyboard_device: self._safe_print(f"Keyboard disconnected, will retry...") self.stream.feed("\r\n[Keyboard disconnected]\r\n") self.update_display() try: self.keyboard_device.close() except: pass self.keyboard_device = None time.sleep(0.5) def spawn_shell(self, command=None): """Spawn a shell or run a command in the virtual terminal""" if command is None: command = os.environ.get('SHELL', '/bin/bash') # Set up environment env = os.environ.copy() env['TERM'] = 'linux' env['COLUMNS'] = str(self.cols) env['LINES'] = str(self.rows) # Create a pseudo-terminal pid, self.master_fd = pty.fork() if pid == 0: # Child process # Execute the shell os.execvpe(command, [command], env) else: # Parent process self.running = True # Start reading thread for terminal output read_thread = threading.Thread(target=self.read_terminal_output) read_thread.daemon = True read_thread.start() # Start input thread for user input input_thread = threading.Thread(target=self.read_user_input) input_thread.daemon = True input_thread.start() self._safe_print(f"\nVirtual terminal started (PID: {pid})") self._safe_print("Terminal output will be displayed above and on the e-ink display") self._safe_print("Type commands normally - they will be sent to the virtual terminal") self._safe_print("Press Ctrl+C or Ctrl+D to exit\n") try: while True: # Check if child process is still running try: pid_status, status = os.waitpid(pid, os.WNOHANG) if pid_status != 0: break except ChildProcessError: break time.sleep(0.5) except KeyboardInterrupt: self._safe_print("\nShutting down...") finally: self.running = False # Cancel any pending update and do final refresh with self._update_lock: if self._update_timer is not None: self._update_timer.cancel() self._update_timer = None self._do_update_display() # Give threads time to finish time.sleep(0.2) os.close(self.master_fd) if self.use_display and self.epd: try: self._safe_print("Putting e-ink display to sleep...") with self._silence_console(): self.epd.sleep() except: pass def run_command(self, command): """Run a single command and display output""" self.running = True try: # Run the command and capture output result = subprocess.run(command, shell=True, capture_output=True, text=True, timeout=30) # Feed output to pyte output = result.stdout + result.stderr self.stream.feed(output) # Update display (force immediate for single commands) self._do_update_display() except subprocess.TimeoutExpired: self._safe_print("Command timed out") except Exception as e: self._safe_print(f"Error running command: {e}") finally: self.running = False if self.use_display and self.epd: try: with self._silence_console(): self.epd.sleep() except: pass def main(): import argparse parser = argparse.ArgumentParser(description='Virtual Terminal for E-Ink Display') parser.add_argument('-d', '--display', choices=list(DISPLAY_CONFIGS.keys()), default='epd4in2', help='E-ink display type') parser.add_argument('-n', '--no-display', action='store_true', help='Preview only, do not use actual e-ink display') parser.add_argument('-c', '--command', help='Run a single command instead of spawning a shell') parser.add_argument('-s', '--shell', help='Shell to use (default: $SHELL or /bin/bash)') parser.add_argument('-z', '--zoom', type=float, default=1.0, help='Zoom multiplier for text size (e.g., 2.0 for 2x larger text)') parser.add_argument('-p', '--no-preview', action='store_true', help='Disable ASCII preview in console output (useful for service mode)') args = parser.parse_args() # Create virtual terminal vt = VirtualTerminalEInk( display_type=args.display, use_display=not args.no_display, zoom=args.zoom, show_preview=not args.no_preview ) if args.command: # Run single command vt.run_command(args.command) else: # Spawn interactive shell shell = args.shell or os.environ.get('SHELL', '/bin/bash') vt.spawn_shell(shell) if __name__ == '__main__': main()