eink-linux-pda/virtual_terminal_eink.py

952 lines
41 KiB
Python
Raw Permalink Normal View History

2026-01-15 10:21:39 -05:00
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
"""
Virtual Terminal for E-Ink Display
Spawns a virtual terminal, displays preview in console, and writes to e-ink display
"""
import sys
import os
import time
import threading
import pty
import select
import subprocess
import shutil
import contextlib
import glob
from PIL import Image, ImageDraw, ImageFont, ImageOps
try:
from evdev import InputDevice, categorize, ecodes, list_devices
HAS_EVDEV = True
except ImportError:
HAS_EVDEV = False
# Add the waveshare library path
libdir = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'e-Paper/RaspberryPi_JetsonNano/python/lib')
if os.path.exists(libdir):
sys.path.append(libdir)
try:
import pyte
except ImportError:
print("Error: pyte library not found. Install with: pip3 install pyte")
sys.exit(1)
# Configuration for different display types
DISPLAY_CONFIGS = {
'epd2in13_V4': {
'width': 122,
'height': 250,
'module': 'epd2in13_V4',
'char_width': 6, # Width of each character in pixels
'char_height': 12, # Height of each character in pixels
},
'epd2in7': {
'width': 176,
'height': 264,
'module': 'epd2in7',
'char_width': 6,
'char_height': 12,
},
'epd4in2': {
'width': 400,
'height': 300,
'module': 'epd4in2',
'char_width': 7,
'char_height': 14,
'font_size': 12,
},
'epd7in5_V2': {
'width': 800,
'height': 480,
'module': 'epd7in5_V2',
'char_width': 8,
'char_height': 16,
},
}
class VirtualTerminalEInk:
def __init__(self, display_type='epd2in13_V4', use_display=True, zoom=1.0, show_preview=True):
2026-01-15 10:21:39 -05:00
"""
Initialize the virtual terminal for e-ink display
Args:
display_type: Type of e-ink display (see DISPLAY_CONFIGS)
use_display: Whether to actually use the e-ink display (False for testing)
zoom: Zoom multiplier for rendering (1.0 = normal, 2.0 = 2x larger text, etc.)
show_preview: Whether to show ASCII preview in console output
2026-01-15 10:21:39 -05:00
"""
self.display_type = display_type
self.use_display = use_display
self.zoom = max(1.0, float(zoom)) # Ensure zoom is at least 1.0
self.show_preview = show_preview
2026-01-15 10:21:39 -05:00
self.config = DISPLAY_CONFIGS.get(display_type)
# Serialize console output (preview redraw + status messages) so it doesn't
# get interleaved with background threads or noisy hardware drivers.
self._console_lock = threading.RLock()
if not self.config:
raise ValueError(f"Unknown display type: {display_type}")
# Load font and measure actual character dimensions
self.font = self._load_font()
self._measure_font_metrics()
# Calculate terminal size in characters (accounting for zoom)
# With zoom, we render to a smaller virtual terminal and scale it up
self.cols = int(self.config['width'] / (self.char_width * self.zoom))
self.rows = int(self.config['height'] / (self.char_height * self.zoom))
self._safe_print(f"Display: {display_type} ({self.config['width']}x{self.config['height']})")
self._safe_print(f"Font metrics: {self.char_width}x{self.char_height} pixels per character")
self._safe_print(f"Terminal size: {self.cols}x{self.rows} characters")
# Initialize pyte screen
self.screen = pyte.Screen(self.cols, self.rows)
self.stream = pyte.Stream(self.screen)
# Track screen deltas for partial refresh.
self._last_snapshot = None
self._last_cursor = None
self._partial_ready = False
self._partial_enabled = True
# Debounce display updates to batch rapid changes (typing)
self._update_debounce_ms = 60 # milliseconds to wait before refreshing
self._pending_update = False
self._update_timer = None
self._update_lock = threading.Lock()
# Quadrant zoom feature (F5-F8 keys)
self._quadrant_view = None # None, 'top_left', 'top_right', 'bottom_left', 'bottom_right'
self._quadrant_changed = False # Flag to force refresh when quadrant changes
2026-01-15 10:21:39 -05:00
# Initialize e-ink display
self.epd = None
if self.use_display:
try:
module = __import__(f'waveshare_epd.{self.config["module"]}', fromlist=[self.config['module']])
self.epd = module.EPD()
self._safe_print("Initializing e-ink display...")
with self._silence_console():
self.epd.init()
# Call Clear without arguments for displays that don't take parameters
try:
self.epd.Clear(0xFF)
except TypeError:
self.epd.Clear()
self._safe_print("E-ink display ready!")
# If supported, switch the panel into partial-update mode so we can
# update only the changed region (e.g., per typed character).
self._prepare_partial_mode_if_supported()
except Exception as e:
self._safe_print(f"Warning: Could not initialize e-ink display: {e}")
self._safe_print("Running in preview-only mode")
self.use_display = False
# Terminal process
self.master_fd = None
self.running = False
# Direct keyboard input (headless mode)
self.keyboard_device = None
self.keyboard_path = None
self.keyboard_reconnect_interval = 2.0 # seconds
def _safe_print(self, *args, **kwargs):
# When the terminal is put into raw-ish modes (see read_user_input), some
# terminals stop translating '\n' into CRLF. Using CRLF avoids "stair-step"
# indentation in the preview.
with self._console_lock:
end = kwargs.pop('end', '\r\n')
flush = kwargs.pop('flush', True)
print(*args, end=end, flush=flush, **kwargs)
@contextlib.contextmanager
def _silence_console(self):
"""Temporarily silence stdout/stderr (e.g., Waveshare driver prints)."""
with open(os.devnull, 'w') as devnull:
with contextlib.redirect_stdout(devnull), contextlib.redirect_stderr(devnull):
yield
def _epd_supports_partial_window(self):
"""Return True if the active driver exposes a windowed partial refresh API."""
if not self.epd:
return False
# epd4in2 exposes a dedicated region updater.
if hasattr(self.epd, 'EPD_4IN2_PartialDisplay'):
return True
# Many other drivers expose display_Partial(Image, Xstart, Ystart, Xend, Yend)
if hasattr(self.epd, 'display_Partial'):
return True
return False
def _prepare_partial_mode_if_supported(self):
if not (self.use_display and self.epd and self._partial_enabled):
return
if not self._epd_supports_partial_window():
return
# Some drivers require switching LUT/mode via init_Partial.
if hasattr(self.epd, 'init_Partial'):
try:
with self._silence_console():
self.epd.init_Partial()
self._partial_ready = True
except Exception:
self._partial_ready = False
def _snapshot_screen(self):
"""Return a compact snapshot of the current pyte screen state."""
snapshot = []
buf = self.screen.buffer
for row in range(self.rows):
row_cells = []
line = buf[row]
for col in range(self.cols):
ch = line[col]
row_cells.append((ch.data, bool(getattr(ch, 'reverse', False))))
snapshot.append(row_cells)
return snapshot
def _compute_dirty_cell_bbox(self, prev_snapshot, curr_snapshot, prev_cursor, curr_cursor):
"""Compute bounding box of changed cells (inclusive). Returns (min_c, min_r, max_c, max_r) or None."""
if prev_snapshot is None:
return (0, 0, self.cols - 1, self.rows - 1)
min_c = None
min_r = None
max_c = None
max_r = None
for r in range(self.rows):
prev_row = prev_snapshot[r]
curr_row = curr_snapshot[r]
for c in range(self.cols):
if prev_row[c] != curr_row[c]:
if min_c is None:
min_c = max_c = c
min_r = max_r = r
else:
min_c = min(min_c, c)
max_c = max(max_c, c)
min_r = min(min_r, r)
max_r = max(max_r, r)
# Always refresh old + new cursor cells so underline moves cleanly.
for cur in (prev_cursor, curr_cursor):
if not cur:
continue
c, r = cur
if 0 <= c < self.cols and 0 <= r < self.rows:
if min_c is None:
min_c = max_c = c
min_r = max_r = r
else:
min_c = min(min_c, c)
max_c = max(max_c, c)
min_r = min(min_r, r)
max_r = max(max_r, r)
if min_c is None:
return None
return (min_c, min_r, max_c, max_r)
def _cell_bbox_to_pixel_window(self, cell_bbox):
"""Convert cell bbox to pixel window (x_start, y_start, x_end, y_end), end-exclusive."""
min_c, min_r, max_c, max_r = cell_bbox
x_start = min_c * self.char_width
y_start = min_r * self.char_height
x_end = (max_c + 1) * self.char_width
y_end = (max_r + 1) * self.char_height
# Expand Y slightly to ensure cursor underline and descenders are fully captured
y_end = min(y_end + 2, self.config['height'])
# Clamp to display bounds.
x_start = max(0, min(x_start, self.config['width']))
y_start = max(0, min(y_start, self.config['height']))
x_end = max(0, min(x_end, self.config['width']))
y_end = max(0, min(y_end, self.config['height']))
# Align X to byte boundaries (8px) for most controllers.
x_start = (x_start // 8) * 8
x_end = ((x_end + 7) // 8) * 8
x_end = max(x_start + 8, min(x_end, self.config['width']))
# Ensure non-empty window.
if y_end <= y_start:
y_end = min(y_start + self.char_height, self.config['height'])
return (x_start, y_start, x_end, y_end)
def _find_keyboard_device(self):
"""Find a keyboard input device, preferring Bluetooth keyboards."""
if not HAS_EVDEV:
return None
try:
devices = [InputDevice(path) for path in list_devices()]
keyboards = []
for device in devices:
# Check if device has keyboard capabilities
caps = device.capabilities()
if ecodes.EV_KEY in caps:
# Check for typical keyboard keys
keys = caps[ecodes.EV_KEY]
has_letters = any(k in keys for k in range(ecodes.KEY_Q, ecodes.KEY_P + 1))
if has_letters:
# Prefer Bluetooth keyboards
is_bluetooth = 'bluetooth' in device.name.lower() or 'bt' in device.name.lower()
keyboards.append((device, is_bluetooth))
if not keyboards:
return None
# Sort by Bluetooth preference
keyboards.sort(key=lambda x: x[1], reverse=True)
selected = keyboards[0][0]
self._safe_print(f"Found keyboard: {selected.name} ({selected.path})")
return selected
except Exception as e:
self._safe_print(f"Error finding keyboard: {e}")
return None
def _epd_sync_partial_base_from_buffer(self, buffer_bytes):
"""Keep driver-side 'old image' buffer in sync for partial updates."""
if not self.epd or not hasattr(self.epd, 'DATA'):
return
try:
# The driver inverts the buffer itself during partial refresh,
# so we store it as-is (not inverted).
self.epd.DATA = list(buffer_bytes)
except Exception:
pass
def _load_font(self):
"""Load a monospace font"""
font_size = self.config.get('font_size', self.config['char_height'] - 2)
# Try to use a monospace font, fall back to default if not available
for font_path in [
'/usr/share/fonts/truetype/dejavu/DejaVuSansMono.ttf',
'/usr/share/fonts/truetype/liberation/LiberationMono-Regular.ttf',
'/usr/share/fonts/truetype/freefont/FreeMono.ttf',
]:
try:
return ImageFont.truetype(font_path, font_size)
except:
continue
return ImageFont.load_default()
def _measure_font_metrics(self):
"""Measure actual character dimensions from the loaded font"""
# Create a temporary image to measure text
temp_img = Image.new('1', (200, 100), 255)
draw = ImageDraw.Draw(temp_img)
# Measure multiple characters to get accurate width
test_str = "MMMMMMMMMM" # 10 M's
bbox = draw.textbbox((0, 0), test_str, font=self.font)
avg_width = (bbox[2] - bbox[0]) / 10
# Measure height
bbox_height = draw.textbbox((0, 0), 'M', font=self.font)
height = bbox_height[3] - bbox_height[1]
# Round up to ensure characters don't overlap
self.char_width = int(avg_width) + 1
self.char_height = int(height) + 2
# Ensure minimum dimensions
if self.char_width < 6:
self.char_width = 6
if self.char_height < 10:
self.char_height = 10
self._safe_print(f"DEBUG: Measured char dimensions - width: {self.char_width}, height: {self.char_height}")
def render_to_image(self):
"""Render the virtual terminal screen to a PIL Image"""
# Create a white image
image = Image.new('1', (self.config['width'], self.config['height']), 255)
draw = ImageDraw.Draw(image)
# Draw each character
for row in range(self.rows):
for col in range(self.cols):
char = self.screen.buffer[row][col]
if char.data != ' ':
x = col * self.char_width
y = row * self.char_height
# Draw character (0 = black, 255 = white)
if char.reverse:
# Inverted colors for reverse video
draw.rectangle([(x, y), (x + self.char_width,
y + self.char_height)], fill=0)
draw.text((x, y), char.data, font=self.font, fill=255)
else:
draw.text((x, y), char.data, font=self.font, fill=0)
# Draw cursor
cursor_row = self.screen.cursor.y
cursor_col = self.screen.cursor.x
cursor_x = cursor_col * self.char_width
cursor_y = cursor_row * self.char_height
draw.rectangle([(cursor_x, cursor_y + self.char_height - 2),
(cursor_x + self.char_width, cursor_y + self.char_height)],
fill=0)
return image
def display_preview(self):
"""Display ASCII preview of the terminal in the console"""
with self._console_lock:
term_size = shutil.get_terminal_size(fallback=(80, 24))
term_cols = max(20, term_size.columns)
term_lines = max(10, term_size.lines)
# Keep a small bottom area for status lines.
max_visible_rows = max(1, term_lines - 4)
visible_rows = min(self.rows, max_visible_rows)
# Clamp preview width to terminal width so the border never wraps.
max_visible_cols = max(10, term_cols - 2)
visible_cols = min(self.cols, max_visible_cols)
cursor_col = self.screen.cursor.x
cursor_row = self.screen.cursor.y
# If the virtual terminal is wider than our real terminal, horizontally scroll
# so the cursor stays visible.
if visible_cols < self.cols:
start_col = cursor_col - (visible_cols // 2)
start_col = max(0, min(start_col, self.cols - visible_cols))
else:
start_col = 0
# Clear screen and redraw in-place.
sys.stdout.write("\x1b[2J\x1b[H")
border = "=" * (visible_cols + 2)
sys.stdout.write(border + "\r\n")
for row in range(visible_rows):
# Some pyte Line implementations support slicing; if not, fall back.
try:
row_slice = self.screen.buffer[row][start_col:start_col + visible_cols]
line = "".join(ch.data for ch in row_slice)
except Exception:
line_chars = []
for col in range(start_col, start_col + visible_cols):
line_chars.append(self.screen.buffer[row][col].data)
line = "".join(line_chars)
# Pad to keep the right border aligned.
if len(line) < visible_cols:
line = line.ljust(visible_cols)
sys.stdout.write("|" + line + "|\r\n")
sys.stdout.write(border + "\r\n")
if start_col:
sys.stdout.write(
f"Cursor at: ({cursor_col}, {cursor_row}) view: cols {start_col}-{start_col + visible_cols - 1}\r\n"
)
else:
sys.stdout.write(f"Cursor at: ({cursor_col}, {cursor_row})\r\n")
sys.stdout.flush()
def _do_update_display(self):
"""Internal: Actually perform the display update (called after debounce)."""
with self._console_lock:
# Display preview in console (if enabled)
if self.show_preview:
self.display_preview()
2026-01-15 10:21:39 -05:00
# Update e-ink display (silence driver prints to avoid tearing the preview)
if self.use_display and self.epd:
try:
curr_snapshot = self._snapshot_screen()
curr_cursor = (self.screen.cursor.x, self.screen.cursor.y)
dirty_cells = self._compute_dirty_cell_bbox(
self._last_snapshot,
curr_snapshot,
self._last_cursor,
curr_cursor,
)
# Force update if quadrant view changed (even if screen didn't change)
if self._quadrant_changed:
self._safe_print(f"[DEBUG] Forcing display update due to quadrant view change")
dirty_cells = (0, 0, self.cols - 1, self.rows - 1) # Force full refresh
self._quadrant_changed = False # Reset flag
2026-01-15 10:21:39 -05:00
# Nothing changed -> skip panel update.
if dirty_cells is None:
self._last_snapshot = curr_snapshot
self._last_cursor = curr_cursor
return
# Heuristic: large changes -> full refresh.
min_c, min_r, max_c, max_r = dirty_cells
changed_area = (max_c - min_c + 1) * (max_r - min_r + 1)
total_area = self.cols * self.rows
do_full = (self._last_snapshot is None) or (changed_area > total_area * 0.35)
image = self.render_to_image()
# Apply quadrant view if active (before zoom)
if self._quadrant_view is not None:
self._safe_print(f"[DEBUG] Applying quadrant view: {self._quadrant_view}")
# Crop to the selected quadrant
half_width = image.width // 2
half_height = image.height // 2
if self._quadrant_view == 'top_left':
image = image.crop((0, 0, half_width, half_height))
elif self._quadrant_view == 'top_right':
image = image.crop((half_width, 0, image.width, half_height))
elif self._quadrant_view == 'bottom_left':
image = image.crop((0, half_height, half_width, image.height))
elif self._quadrant_view == 'bottom_right':
image = image.crop((half_width, half_height, image.width, image.height))
self._safe_print(f"[DEBUG] Cropped to size: {image.size}, scaling to display: {self.config['width']}x{self.config['height']}")
# Scale the quadrant to full display size
image = image.resize((self.config['width'], self.config['height']), Image.NEAREST)
# Apply zoom if needed (independent of quadrant view)
2026-01-15 10:21:39 -05:00
if self.zoom > 1.0:
new_width = int(image.width * self.zoom)
new_height = int(image.height * self.zoom)
# Use NEAREST for crisp text scaling
image = image.resize((new_width, new_height), Image.NEAREST)
# Crop to display size if zoomed image is larger
if new_width > self.config['width'] or new_height > self.config['height']:
image = image.crop((0, 0, self.config['width'], self.config['height']))
# Prefer partial window update when supported.
if (not do_full) and self._partial_enabled and self._epd_supports_partial_window():
if not self._partial_ready:
self._prepare_partial_mode_if_supported()
x_start, y_start, x_end, y_end = self._cell_bbox_to_pixel_window(dirty_cells)
# Partial refresh: EPD_4IN2_PartialDisplay inverts internally, so use raw buffer
buffer_bytes = self.epd.getbuffer(image)
with self._silence_console():
if hasattr(self.epd, 'EPD_4IN2_PartialDisplay'):
# epd4in2 expects full-frame buffer and a pixel window.
# The driver inverts and stores in DATA internally, so we don't sync here.
self.epd.EPD_4IN2_PartialDisplay(x_start, y_start, x_end, y_end, buffer_bytes)
elif hasattr(self.epd, 'display_Partial'):
# Many drivers accept a PIL Image + window. If this driver
# has a different signature, it will raise and we fall back.
try:
self.epd.display_Partial(image, x_start, y_start, x_end, y_end)
except TypeError:
# Some implementations only accept a single image.
self.epd.display_Partial(image)
# Sync for other drivers that may not maintain DATA
self._epd_sync_partial_base_from_buffer(buffer_bytes)
else:
# Full refresh: display() expects inverted polarity
image_inverted = ImageOps.invert(image.convert('L')).convert('1')
buffer_bytes = self.epd.getbuffer(image_inverted)
with self._silence_console():
self.epd.display(buffer_bytes)
self._epd_sync_partial_base_from_buffer(buffer_bytes)
# Re-enter partial mode after a full refresh if the driver supports it.
self._partial_ready = False
self._prepare_partial_mode_if_supported()
self._last_snapshot = curr_snapshot
self._last_cursor = curr_cursor
except Exception as e:
self._safe_print(f"Error updating e-ink display: {e}")
def update_display(self):
"""Schedule a debounced display update to batch rapid changes together."""
with self._update_lock:
# Cancel any pending timer
if self._update_timer is not None:
self._update_timer.cancel()
# Schedule new update after debounce period
self._update_timer = threading.Timer(
self._update_debounce_ms / 1000.0,
self._do_update_display
)
self._update_timer.daemon = True
self._update_timer.start()
def read_terminal_output(self):
"""Read output from the terminal and feed it to pyte"""
while self.running:
# Check if there's data to read
r, w, e = select.select([self.master_fd], [], [], 0.1)
if self.master_fd in r:
try:
data = os.read(self.master_fd, 1024)
if data:
# Feed data to pyte stream
self.stream.feed(data.decode('utf-8', errors='replace'))
# Update display
self.update_display()
except OSError:
break
def read_user_input(self):
"""Read user input directly from keyboard device with auto-reconnection."""
if not HAS_EVDEV:
self._safe_print("Warning: evdev not available, install with: pip3 install evdev")
return
# Proper QWERTY keyboard layout mapping based on actual evdev keycodes
key_map = {
# Special keys
ecodes.KEY_ENTER: '\r', # Send carriage return (terminal driver handles conversion)
2026-01-15 10:21:39 -05:00
ecodes.KEY_TAB: '\t',
ecodes.KEY_SPACE: ' ',
ecodes.KEY_BACKSPACE: '\x7f',
ecodes.KEY_ESC: '\x1b',
ecodes.KEY_MINUS: '-',
ecodes.KEY_EQUAL: '=',
ecodes.KEY_LEFTBRACE: '[',
ecodes.KEY_RIGHTBRACE: ']',
ecodes.KEY_SEMICOLON: ';',
ecodes.KEY_APOSTROPHE: "'",
ecodes.KEY_GRAVE: '`',
ecodes.KEY_BACKSLASH: '\\',
ecodes.KEY_COMMA: ',',
ecodes.KEY_DOT: '.',
ecodes.KEY_SLASH: '/',
# Arrow keys (ANSI escape sequences for terminal navigation)
ecodes.KEY_UP: '\x1b[A',
ecodes.KEY_DOWN: '\x1b[B',
ecodes.KEY_RIGHT: '\x1b[C',
ecodes.KEY_LEFT: '\x1b[D',
# Function keys (for quadrant zoom)
ecodes.KEY_F5: '<F5>',
ecodes.KEY_F6: '<F6>',
ecodes.KEY_F7: '<F7>',
ecodes.KEY_F8: '<F8>',
2026-01-15 10:21:39 -05:00
# Number row (unshifted)
ecodes.KEY_1: '1', ecodes.KEY_2: '2', ecodes.KEY_3: '3',
ecodes.KEY_4: '4', ecodes.KEY_5: '5', ecodes.KEY_6: '6',
ecodes.KEY_7: '7', ecodes.KEY_8: '8', ecodes.KEY_9: '9',
ecodes.KEY_0: '0',
# Letter keys (lowercase)
ecodes.KEY_Q: 'q', ecodes.KEY_W: 'w', ecodes.KEY_E: 'e', ecodes.KEY_R: 'r',
ecodes.KEY_T: 't', ecodes.KEY_Y: 'y', ecodes.KEY_U: 'u', ecodes.KEY_I: 'i',
ecodes.KEY_O: 'o', ecodes.KEY_P: 'p', ecodes.KEY_A: 'a', ecodes.KEY_S: 's',
ecodes.KEY_D: 'd', ecodes.KEY_F: 'f', ecodes.KEY_G: 'g', ecodes.KEY_H: 'h',
ecodes.KEY_J: 'j', ecodes.KEY_K: 'k', ecodes.KEY_L: 'l', ecodes.KEY_Z: 'z',
ecodes.KEY_X: 'x', ecodes.KEY_C: 'c', ecodes.KEY_V: 'v', ecodes.KEY_B: 'b',
ecodes.KEY_N: 'n', ecodes.KEY_M: 'm',
}
# Shifted character mappings
shift_map = {
'1': '!', '2': '@', '3': '#', '4': '$', '5': '%',
'6': '^', '7': '&', '8': '*', '9': '(', '0': ')',
'-': '_', '=': '+', '[': '{', ']': '}', '\\': '|',
';': ':', "'": '"', '`': '~', ',': '<', '.': '>', '/': '?',
}
shift_pressed = False
ctrl_pressed = False
last_reconnect_attempt = 0
while self.running:
# Try to connect/reconnect to keyboard
if self.keyboard_device is None:
current_time = time.time()
if current_time - last_reconnect_attempt >= self.keyboard_reconnect_interval:
last_reconnect_attempt = current_time
self.keyboard_device = self._find_keyboard_device()
if self.keyboard_device:
self.keyboard_path = self.keyboard_device.path
# Display connection status on screen
self.stream.feed(f"\r\n[Keyboard connected: {self.keyboard_device.name}]\r\n")
self.update_display()
else:
time.sleep(0.5)
continue
try:
# Check device is still valid before select
if self.keyboard_device is None:
time.sleep(0.1)
continue
# Read keyboard events with timeout
r, w, e = select.select([self.keyboard_device.fd], [], [], 0.1)
if not r:
continue
for event in self.keyboard_device.read():
if event.type == ecodes.EV_KEY:
key_event = categorize(event)
# Debug: show all key events
if event.value == 1: # Key press
key_name = ecodes.KEY[event.code] if event.code in ecodes.KEY else f"UNKNOWN({event.code})"
self._safe_print(f"[DEBUG] Key pressed: {key_name} (code: {event.code})")
2026-01-15 10:21:39 -05:00
# Track modifier keys
if event.code in (ecodes.KEY_LEFTSHIFT, ecodes.KEY_RIGHTSHIFT):
shift_pressed = (event.value == 1)
continue
elif event.code in (ecodes.KEY_LEFTCTRL, ecodes.KEY_RIGHTCTRL):
ctrl_pressed = (event.value == 1)
continue
# Only process key press (not release)
if event.value != 1:
continue
# Handle Ctrl+letter combinations
if ctrl_pressed:
# Map letter keys to Ctrl codes (Ctrl+A = 1, Ctrl+B = 2, ..., Ctrl+Z = 26)
ctrl_letter_map = {
ecodes.KEY_A: 1, ecodes.KEY_B: 2, ecodes.KEY_C: 3, ecodes.KEY_D: 4,
ecodes.KEY_E: 5, ecodes.KEY_F: 6, ecodes.KEY_G: 7, ecodes.KEY_H: 8,
ecodes.KEY_I: 9, ecodes.KEY_J: 10, ecodes.KEY_K: 11, ecodes.KEY_L: 12,
ecodes.KEY_M: 13, ecodes.KEY_N: 14, ecodes.KEY_O: 15, ecodes.KEY_P: 16,
ecodes.KEY_Q: 17, ecodes.KEY_R: 18, ecodes.KEY_S: 19, ecodes.KEY_T: 20,
ecodes.KEY_U: 21, ecodes.KEY_V: 22, ecodes.KEY_W: 23, ecodes.KEY_X: 24,
ecodes.KEY_Y: 25, ecodes.KEY_Z: 26,
}
if event.code in ctrl_letter_map:
ctrl_char = chr(ctrl_letter_map[event.code])
os.write(self.master_fd, ctrl_char.encode())
# Ctrl+D exits the terminal
if event.code == ecodes.KEY_D:
self.stream.feed("\r\n[Exiting...]\r\n")
self.update_display()
self.running = False
break
continue
2026-01-15 10:21:39 -05:00
# Handle F5-F8 for quadrant zoom
if event.code == ecodes.KEY_F5:
self._safe_print(f"[DEBUG] F5 pressed - zooming to top-left quadrant")
self._quadrant_view = 'top_left'
self._quadrant_changed = True
self.update_display()
continue
elif event.code == ecodes.KEY_F6:
self._safe_print(f"[DEBUG] F6 pressed - zooming to top-right quadrant")
self._quadrant_view = 'top_right'
self._quadrant_changed = True
self.update_display()
continue
elif event.code == ecodes.KEY_F7:
self._safe_print(f"[DEBUG] F7 pressed - zooming to bottom-left quadrant")
self._quadrant_view = 'bottom_left'
self._quadrant_changed = True
self.update_display()
continue
elif event.code == ecodes.KEY_F8:
self._safe_print(f"[DEBUG] F8 pressed - zooming to bottom-right quadrant")
self._quadrant_view = 'bottom_right'
self._quadrant_changed = True
self.update_display()
2026-01-15 10:21:39 -05:00
continue
# Map key to character
char = key_map.get(event.code)
if char:
# Reset quadrant view on any normal keypress
if self._quadrant_view is not None:
self._safe_print(f"[DEBUG] Resetting quadrant view to full screen")
self._quadrant_view = None
self._quadrant_changed = True
self.update_display()
# Apply shift modifier (ctrl is handled separately above)
if shift_pressed and not ctrl_pressed:
2026-01-15 10:21:39 -05:00
if char.isalpha():
char = char.upper()
elif char in shift_map:
char = shift_map[char]
# Send to PTY
os.write(self.master_fd, char.encode())
except (OSError, IOError) as e:
# Keyboard disconnected
if self.keyboard_device:
self._safe_print(f"Keyboard disconnected, will retry...")
self.stream.feed("\r\n[Keyboard disconnected]\r\n")
self.update_display()
try:
self.keyboard_device.close()
except:
pass
self.keyboard_device = None
time.sleep(0.5)
def spawn_shell(self, command=None):
"""Spawn a shell or run a command in the virtual terminal"""
if command is None:
command = os.environ.get('SHELL', '/bin/bash')
# Set up environment
env = os.environ.copy()
env['TERM'] = 'linux'
env['COLUMNS'] = str(self.cols)
env['LINES'] = str(self.rows)
# Create a pseudo-terminal
pid, self.master_fd = pty.fork()
if pid == 0: # Child process
# Execute the shell
os.execvpe(command, [command], env)
else: # Parent process
self.running = True
# Start reading thread for terminal output
read_thread = threading.Thread(target=self.read_terminal_output)
read_thread.daemon = True
read_thread.start()
# Start input thread for user input
input_thread = threading.Thread(target=self.read_user_input)
input_thread.daemon = True
input_thread.start()
self._safe_print(f"\nVirtual terminal started (PID: {pid})")
self._safe_print("Terminal output will be displayed above and on the e-ink display")
self._safe_print("Type commands normally - they will be sent to the virtual terminal")
self._safe_print("Press Ctrl+C or Ctrl+D to exit\n")
try:
while True:
# Check if child process is still running
try:
pid_status, status = os.waitpid(pid, os.WNOHANG)
if pid_status != 0:
break
except ChildProcessError:
break
time.sleep(0.5)
except KeyboardInterrupt:
self._safe_print("\nShutting down...")
finally:
self.running = False
# Cancel any pending update and do final refresh
with self._update_lock:
if self._update_timer is not None:
self._update_timer.cancel()
self._update_timer = None
self._do_update_display()
# Give threads time to finish
time.sleep(0.2)
os.close(self.master_fd)
if self.use_display and self.epd:
try:
self._safe_print("Putting e-ink display to sleep...")
with self._silence_console():
self.epd.sleep()
except:
pass
def run_command(self, command):
"""Run a single command and display output"""
self.running = True
try:
# Run the command and capture output
result = subprocess.run(command, shell=True, capture_output=True,
text=True, timeout=30)
# Feed output to pyte
output = result.stdout + result.stderr
self.stream.feed(output)
# Update display (force immediate for single commands)
self._do_update_display()
except subprocess.TimeoutExpired:
self._safe_print("Command timed out")
except Exception as e:
self._safe_print(f"Error running command: {e}")
finally:
self.running = False
if self.use_display and self.epd:
try:
with self._silence_console():
self.epd.sleep()
except:
pass
def main():
import argparse
parser = argparse.ArgumentParser(description='Virtual Terminal for E-Ink Display')
parser.add_argument('-d', '--display',
choices=list(DISPLAY_CONFIGS.keys()),
default='epd4in2',
help='E-ink display type')
parser.add_argument('-n', '--no-display',
action='store_true',
help='Preview only, do not use actual e-ink display')
parser.add_argument('-c', '--command',
help='Run a single command instead of spawning a shell')
parser.add_argument('-s', '--shell',
help='Shell to use (default: $SHELL or /bin/bash)')
parser.add_argument('-z', '--zoom',
type=float,
default=1.0,
help='Zoom multiplier for text size (e.g., 2.0 for 2x larger text)')
parser.add_argument('-p', '--no-preview',
action='store_true',
help='Disable ASCII preview in console output (useful for service mode)')
2026-01-15 10:21:39 -05:00
args = parser.parse_args()
# Create virtual terminal
vt = VirtualTerminalEInk(
display_type=args.display,
use_display=not args.no_display,
zoom=args.zoom,
show_preview=not args.no_preview
2026-01-15 10:21:39 -05:00
)
if args.command:
# Run single command
vt.run_command(args.command)
else:
# Spawn interactive shell
shell = args.shell or os.environ.get('SHELL', '/bin/bash')
vt.spawn_shell(shell)
if __name__ == '__main__':
main()