951 lines
41 KiB
Python
Executable file
951 lines
41 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
# -*- coding:utf-8 -*-
|
|
"""
|
|
Virtual Terminal for E-Ink Display
|
|
Spawns a virtual terminal, displays preview in console, and writes to e-ink display
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import time
|
|
import threading
|
|
import pty
|
|
import select
|
|
import subprocess
|
|
import shutil
|
|
import contextlib
|
|
import glob
|
|
from PIL import Image, ImageDraw, ImageFont, ImageOps
|
|
|
|
try:
|
|
from evdev import InputDevice, categorize, ecodes, list_devices
|
|
HAS_EVDEV = True
|
|
except ImportError:
|
|
HAS_EVDEV = False
|
|
|
|
# Add the waveshare library path
|
|
libdir = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'e-Paper/RaspberryPi_JetsonNano/python/lib')
|
|
if os.path.exists(libdir):
|
|
sys.path.append(libdir)
|
|
|
|
try:
|
|
import pyte
|
|
except ImportError:
|
|
print("Error: pyte library not found. Install with: pip3 install pyte")
|
|
sys.exit(1)
|
|
|
|
# Configuration for different display types
|
|
DISPLAY_CONFIGS = {
|
|
'epd2in13_V4': {
|
|
'width': 122,
|
|
'height': 250,
|
|
'module': 'epd2in13_V4',
|
|
'char_width': 6, # Width of each character in pixels
|
|
'char_height': 12, # Height of each character in pixels
|
|
},
|
|
'epd2in7': {
|
|
'width': 176,
|
|
'height': 264,
|
|
'module': 'epd2in7',
|
|
'char_width': 6,
|
|
'char_height': 12,
|
|
},
|
|
'epd4in2': {
|
|
'width': 400,
|
|
'height': 300,
|
|
'module': 'epd4in2',
|
|
'char_width': 7,
|
|
'char_height': 14,
|
|
'font_size': 12,
|
|
},
|
|
'epd7in5_V2': {
|
|
'width': 800,
|
|
'height': 480,
|
|
'module': 'epd7in5_V2',
|
|
'char_width': 8,
|
|
'char_height': 16,
|
|
},
|
|
}
|
|
|
|
class VirtualTerminalEInk:
|
|
def __init__(self, display_type='epd2in13_V4', use_display=True, zoom=1.0, show_preview=True):
|
|
"""
|
|
Initialize the virtual terminal for e-ink display
|
|
|
|
Args:
|
|
display_type: Type of e-ink display (see DISPLAY_CONFIGS)
|
|
use_display: Whether to actually use the e-ink display (False for testing)
|
|
zoom: Zoom multiplier for rendering (1.0 = normal, 2.0 = 2x larger text, etc.)
|
|
show_preview: Whether to show ASCII preview in console output
|
|
"""
|
|
self.display_type = display_type
|
|
self.use_display = use_display
|
|
self.zoom = max(1.0, float(zoom)) # Ensure zoom is at least 1.0
|
|
self.show_preview = show_preview
|
|
self.config = DISPLAY_CONFIGS.get(display_type)
|
|
|
|
# Serialize console output (preview redraw + status messages) so it doesn't
|
|
# get interleaved with background threads or noisy hardware drivers.
|
|
self._console_lock = threading.RLock()
|
|
|
|
if not self.config:
|
|
raise ValueError(f"Unknown display type: {display_type}")
|
|
|
|
# Load font and measure actual character dimensions
|
|
self.font = self._load_font()
|
|
self._measure_font_metrics()
|
|
|
|
# Calculate terminal size in characters (accounting for zoom)
|
|
# With zoom, we render to a smaller virtual terminal and scale it up
|
|
self.cols = int(self.config['width'] / (self.char_width * self.zoom))
|
|
self.rows = int(self.config['height'] / (self.char_height * self.zoom))
|
|
|
|
self._safe_print(f"Display: {display_type} ({self.config['width']}x{self.config['height']})")
|
|
self._safe_print(f"Font metrics: {self.char_width}x{self.char_height} pixels per character")
|
|
self._safe_print(f"Terminal size: {self.cols}x{self.rows} characters")
|
|
|
|
# Initialize pyte screen
|
|
self.screen = pyte.Screen(self.cols, self.rows)
|
|
self.stream = pyte.Stream(self.screen)
|
|
|
|
# Track screen deltas for partial refresh.
|
|
self._last_snapshot = None
|
|
self._last_cursor = None
|
|
self._partial_ready = False
|
|
self._partial_enabled = True
|
|
|
|
# Debounce display updates to batch rapid changes (typing)
|
|
self._update_debounce_ms = 60 # milliseconds to wait before refreshing
|
|
self._pending_update = False
|
|
self._update_timer = None
|
|
self._update_lock = threading.Lock()
|
|
|
|
# Quadrant zoom feature (F5-F8 keys)
|
|
self._quadrant_view = None # None, 'top_left', 'top_right', 'bottom_left', 'bottom_right'
|
|
self._quadrant_changed = False # Flag to force refresh when quadrant changes
|
|
|
|
# Initialize e-ink display
|
|
self.epd = None
|
|
if self.use_display:
|
|
try:
|
|
module = __import__(f'waveshare_epd.{self.config["module"]}', fromlist=[self.config['module']])
|
|
self.epd = module.EPD()
|
|
self._safe_print("Initializing e-ink display...")
|
|
with self._silence_console():
|
|
self.epd.init()
|
|
# Call Clear without arguments for displays that don't take parameters
|
|
try:
|
|
self.epd.Clear(0xFF)
|
|
except TypeError:
|
|
self.epd.Clear()
|
|
self._safe_print("E-ink display ready!")
|
|
|
|
# If supported, switch the panel into partial-update mode so we can
|
|
# update only the changed region (e.g., per typed character).
|
|
self._prepare_partial_mode_if_supported()
|
|
except Exception as e:
|
|
self._safe_print(f"Warning: Could not initialize e-ink display: {e}")
|
|
self._safe_print("Running in preview-only mode")
|
|
self.use_display = False
|
|
|
|
# Terminal process
|
|
self.master_fd = None
|
|
self.running = False
|
|
|
|
# Direct keyboard input (headless mode)
|
|
self.keyboard_device = None
|
|
self.keyboard_path = None
|
|
self.keyboard_reconnect_interval = 2.0 # seconds
|
|
|
|
def _safe_print(self, *args, **kwargs):
|
|
# When the terminal is put into raw-ish modes (see read_user_input), some
|
|
# terminals stop translating '\n' into CRLF. Using CRLF avoids "stair-step"
|
|
# indentation in the preview.
|
|
with self._console_lock:
|
|
end = kwargs.pop('end', '\r\n')
|
|
flush = kwargs.pop('flush', True)
|
|
print(*args, end=end, flush=flush, **kwargs)
|
|
|
|
@contextlib.contextmanager
|
|
def _silence_console(self):
|
|
"""Temporarily silence stdout/stderr (e.g., Waveshare driver prints)."""
|
|
with open(os.devnull, 'w') as devnull:
|
|
with contextlib.redirect_stdout(devnull), contextlib.redirect_stderr(devnull):
|
|
yield
|
|
|
|
def _epd_supports_partial_window(self):
|
|
"""Return True if the active driver exposes a windowed partial refresh API."""
|
|
if not self.epd:
|
|
return False
|
|
|
|
# epd4in2 exposes a dedicated region updater.
|
|
if hasattr(self.epd, 'EPD_4IN2_PartialDisplay'):
|
|
return True
|
|
|
|
# Many other drivers expose display_Partial(Image, Xstart, Ystart, Xend, Yend)
|
|
if hasattr(self.epd, 'display_Partial'):
|
|
return True
|
|
|
|
return False
|
|
|
|
def _prepare_partial_mode_if_supported(self):
|
|
if not (self.use_display and self.epd and self._partial_enabled):
|
|
return
|
|
if not self._epd_supports_partial_window():
|
|
return
|
|
|
|
# Some drivers require switching LUT/mode via init_Partial.
|
|
if hasattr(self.epd, 'init_Partial'):
|
|
try:
|
|
with self._silence_console():
|
|
self.epd.init_Partial()
|
|
self._partial_ready = True
|
|
except Exception:
|
|
self._partial_ready = False
|
|
|
|
def _snapshot_screen(self):
|
|
"""Return a compact snapshot of the current pyte screen state."""
|
|
snapshot = []
|
|
buf = self.screen.buffer
|
|
for row in range(self.rows):
|
|
row_cells = []
|
|
line = buf[row]
|
|
for col in range(self.cols):
|
|
ch = line[col]
|
|
row_cells.append((ch.data, bool(getattr(ch, 'reverse', False))))
|
|
snapshot.append(row_cells)
|
|
return snapshot
|
|
|
|
def _compute_dirty_cell_bbox(self, prev_snapshot, curr_snapshot, prev_cursor, curr_cursor):
|
|
"""Compute bounding box of changed cells (inclusive). Returns (min_c, min_r, max_c, max_r) or None."""
|
|
if prev_snapshot is None:
|
|
return (0, 0, self.cols - 1, self.rows - 1)
|
|
|
|
min_c = None
|
|
min_r = None
|
|
max_c = None
|
|
max_r = None
|
|
|
|
for r in range(self.rows):
|
|
prev_row = prev_snapshot[r]
|
|
curr_row = curr_snapshot[r]
|
|
for c in range(self.cols):
|
|
if prev_row[c] != curr_row[c]:
|
|
if min_c is None:
|
|
min_c = max_c = c
|
|
min_r = max_r = r
|
|
else:
|
|
min_c = min(min_c, c)
|
|
max_c = max(max_c, c)
|
|
min_r = min(min_r, r)
|
|
max_r = max(max_r, r)
|
|
|
|
# Always refresh old + new cursor cells so underline moves cleanly.
|
|
for cur in (prev_cursor, curr_cursor):
|
|
if not cur:
|
|
continue
|
|
c, r = cur
|
|
if 0 <= c < self.cols and 0 <= r < self.rows:
|
|
if min_c is None:
|
|
min_c = max_c = c
|
|
min_r = max_r = r
|
|
else:
|
|
min_c = min(min_c, c)
|
|
max_c = max(max_c, c)
|
|
min_r = min(min_r, r)
|
|
max_r = max(max_r, r)
|
|
|
|
if min_c is None:
|
|
return None
|
|
return (min_c, min_r, max_c, max_r)
|
|
|
|
def _cell_bbox_to_pixel_window(self, cell_bbox):
|
|
"""Convert cell bbox to pixel window (x_start, y_start, x_end, y_end), end-exclusive."""
|
|
min_c, min_r, max_c, max_r = cell_bbox
|
|
x_start = min_c * self.char_width
|
|
y_start = min_r * self.char_height
|
|
x_end = (max_c + 1) * self.char_width
|
|
y_end = (max_r + 1) * self.char_height
|
|
|
|
# Expand Y slightly to ensure cursor underline and descenders are fully captured
|
|
y_end = min(y_end + 2, self.config['height'])
|
|
|
|
# Clamp to display bounds.
|
|
x_start = max(0, min(x_start, self.config['width']))
|
|
y_start = max(0, min(y_start, self.config['height']))
|
|
x_end = max(0, min(x_end, self.config['width']))
|
|
y_end = max(0, min(y_end, self.config['height']))
|
|
|
|
# Align X to byte boundaries (8px) for most controllers.
|
|
x_start = (x_start // 8) * 8
|
|
x_end = ((x_end + 7) // 8) * 8
|
|
x_end = max(x_start + 8, min(x_end, self.config['width']))
|
|
|
|
# Ensure non-empty window.
|
|
if y_end <= y_start:
|
|
y_end = min(y_start + self.char_height, self.config['height'])
|
|
|
|
return (x_start, y_start, x_end, y_end)
|
|
|
|
def _find_keyboard_device(self):
|
|
"""Find a keyboard input device, preferring Bluetooth keyboards."""
|
|
if not HAS_EVDEV:
|
|
return None
|
|
|
|
try:
|
|
devices = [InputDevice(path) for path in list_devices()]
|
|
keyboards = []
|
|
|
|
for device in devices:
|
|
# Check if device has keyboard capabilities
|
|
caps = device.capabilities()
|
|
if ecodes.EV_KEY in caps:
|
|
# Check for typical keyboard keys
|
|
keys = caps[ecodes.EV_KEY]
|
|
has_letters = any(k in keys for k in range(ecodes.KEY_Q, ecodes.KEY_P + 1))
|
|
if has_letters:
|
|
# Prefer Bluetooth keyboards
|
|
is_bluetooth = 'bluetooth' in device.name.lower() or 'bt' in device.name.lower()
|
|
keyboards.append((device, is_bluetooth))
|
|
|
|
if not keyboards:
|
|
return None
|
|
|
|
# Sort by Bluetooth preference
|
|
keyboards.sort(key=lambda x: x[1], reverse=True)
|
|
selected = keyboards[0][0]
|
|
self._safe_print(f"Found keyboard: {selected.name} ({selected.path})")
|
|
return selected
|
|
except Exception as e:
|
|
self._safe_print(f"Error finding keyboard: {e}")
|
|
return None
|
|
|
|
def _epd_sync_partial_base_from_buffer(self, buffer_bytes):
|
|
"""Keep driver-side 'old image' buffer in sync for partial updates."""
|
|
if not self.epd or not hasattr(self.epd, 'DATA'):
|
|
return
|
|
try:
|
|
# The driver inverts the buffer itself during partial refresh,
|
|
# so we store it as-is (not inverted).
|
|
self.epd.DATA = list(buffer_bytes)
|
|
except Exception:
|
|
pass
|
|
|
|
def _load_font(self):
|
|
"""Load a monospace font"""
|
|
font_size = self.config.get('font_size', self.config['char_height'] - 2)
|
|
|
|
# Try to use a monospace font, fall back to default if not available
|
|
for font_path in [
|
|
'/usr/share/fonts/truetype/dejavu/DejaVuSansMono.ttf',
|
|
'/usr/share/fonts/truetype/liberation/LiberationMono-Regular.ttf',
|
|
'/usr/share/fonts/truetype/freefont/FreeMono.ttf',
|
|
]:
|
|
try:
|
|
return ImageFont.truetype(font_path, font_size)
|
|
except:
|
|
continue
|
|
|
|
return ImageFont.load_default()
|
|
|
|
def _measure_font_metrics(self):
|
|
"""Measure actual character dimensions from the loaded font"""
|
|
# Create a temporary image to measure text
|
|
temp_img = Image.new('1', (200, 100), 255)
|
|
draw = ImageDraw.Draw(temp_img)
|
|
|
|
# Measure multiple characters to get accurate width
|
|
test_str = "MMMMMMMMMM" # 10 M's
|
|
bbox = draw.textbbox((0, 0), test_str, font=self.font)
|
|
avg_width = (bbox[2] - bbox[0]) / 10
|
|
|
|
# Measure height
|
|
bbox_height = draw.textbbox((0, 0), 'M', font=self.font)
|
|
height = bbox_height[3] - bbox_height[1]
|
|
|
|
# Round up to ensure characters don't overlap
|
|
self.char_width = int(avg_width) + 1
|
|
self.char_height = int(height) + 2
|
|
|
|
# Ensure minimum dimensions
|
|
if self.char_width < 6:
|
|
self.char_width = 6
|
|
if self.char_height < 10:
|
|
self.char_height = 10
|
|
|
|
self._safe_print(f"DEBUG: Measured char dimensions - width: {self.char_width}, height: {self.char_height}")
|
|
|
|
def render_to_image(self):
|
|
"""Render the virtual terminal screen to a PIL Image"""
|
|
# Create a white image
|
|
image = Image.new('1', (self.config['width'], self.config['height']), 255)
|
|
draw = ImageDraw.Draw(image)
|
|
|
|
# Draw each character
|
|
for row in range(self.rows):
|
|
for col in range(self.cols):
|
|
char = self.screen.buffer[row][col]
|
|
if char.data != ' ':
|
|
x = col * self.char_width
|
|
y = row * self.char_height
|
|
|
|
# Draw character (0 = black, 255 = white)
|
|
if char.reverse:
|
|
# Inverted colors for reverse video
|
|
draw.rectangle([(x, y), (x + self.char_width,
|
|
y + self.char_height)], fill=0)
|
|
draw.text((x, y), char.data, font=self.font, fill=255)
|
|
else:
|
|
draw.text((x, y), char.data, font=self.font, fill=0)
|
|
|
|
# Draw cursor
|
|
cursor_row = self.screen.cursor.y
|
|
cursor_col = self.screen.cursor.x
|
|
cursor_x = cursor_col * self.char_width
|
|
cursor_y = cursor_row * self.char_height
|
|
draw.rectangle([(cursor_x, cursor_y + self.char_height - 2),
|
|
(cursor_x + self.char_width, cursor_y + self.char_height)],
|
|
fill=0)
|
|
|
|
return image
|
|
|
|
def display_preview(self):
|
|
"""Display ASCII preview of the terminal in the console"""
|
|
with self._console_lock:
|
|
term_size = shutil.get_terminal_size(fallback=(80, 24))
|
|
term_cols = max(20, term_size.columns)
|
|
term_lines = max(10, term_size.lines)
|
|
|
|
# Keep a small bottom area for status lines.
|
|
max_visible_rows = max(1, term_lines - 4)
|
|
visible_rows = min(self.rows, max_visible_rows)
|
|
|
|
# Clamp preview width to terminal width so the border never wraps.
|
|
max_visible_cols = max(10, term_cols - 2)
|
|
visible_cols = min(self.cols, max_visible_cols)
|
|
|
|
cursor_col = self.screen.cursor.x
|
|
cursor_row = self.screen.cursor.y
|
|
|
|
# If the virtual terminal is wider than our real terminal, horizontally scroll
|
|
# so the cursor stays visible.
|
|
if visible_cols < self.cols:
|
|
start_col = cursor_col - (visible_cols // 2)
|
|
start_col = max(0, min(start_col, self.cols - visible_cols))
|
|
else:
|
|
start_col = 0
|
|
|
|
# Clear screen and redraw in-place.
|
|
sys.stdout.write("\x1b[2J\x1b[H")
|
|
|
|
border = "=" * (visible_cols + 2)
|
|
sys.stdout.write(border + "\r\n")
|
|
|
|
for row in range(visible_rows):
|
|
# Some pyte Line implementations support slicing; if not, fall back.
|
|
try:
|
|
row_slice = self.screen.buffer[row][start_col:start_col + visible_cols]
|
|
line = "".join(ch.data for ch in row_slice)
|
|
except Exception:
|
|
line_chars = []
|
|
for col in range(start_col, start_col + visible_cols):
|
|
line_chars.append(self.screen.buffer[row][col].data)
|
|
line = "".join(line_chars)
|
|
|
|
# Pad to keep the right border aligned.
|
|
if len(line) < visible_cols:
|
|
line = line.ljust(visible_cols)
|
|
sys.stdout.write("|" + line + "|\r\n")
|
|
|
|
sys.stdout.write(border + "\r\n")
|
|
if start_col:
|
|
sys.stdout.write(
|
|
f"Cursor at: ({cursor_col}, {cursor_row}) view: cols {start_col}-{start_col + visible_cols - 1}\r\n"
|
|
)
|
|
else:
|
|
sys.stdout.write(f"Cursor at: ({cursor_col}, {cursor_row})\r\n")
|
|
|
|
sys.stdout.flush()
|
|
|
|
def _do_update_display(self):
|
|
"""Internal: Actually perform the display update (called after debounce)."""
|
|
with self._console_lock:
|
|
# Display preview in console (if enabled)
|
|
if self.show_preview:
|
|
self.display_preview()
|
|
|
|
# Update e-ink display (silence driver prints to avoid tearing the preview)
|
|
if self.use_display and self.epd:
|
|
try:
|
|
curr_snapshot = self._snapshot_screen()
|
|
curr_cursor = (self.screen.cursor.x, self.screen.cursor.y)
|
|
dirty_cells = self._compute_dirty_cell_bbox(
|
|
self._last_snapshot,
|
|
curr_snapshot,
|
|
self._last_cursor,
|
|
curr_cursor,
|
|
)
|
|
|
|
# Force update if quadrant view changed (even if screen didn't change)
|
|
if self._quadrant_changed:
|
|
self._safe_print(f"[DEBUG] Forcing display update due to quadrant view change")
|
|
dirty_cells = (0, 0, self.cols - 1, self.rows - 1) # Force full refresh
|
|
self._quadrant_changed = False # Reset flag
|
|
|
|
# Nothing changed -> skip panel update.
|
|
if dirty_cells is None:
|
|
self._last_snapshot = curr_snapshot
|
|
self._last_cursor = curr_cursor
|
|
return
|
|
|
|
# Heuristic: large changes -> full refresh.
|
|
min_c, min_r, max_c, max_r = dirty_cells
|
|
changed_area = (max_c - min_c + 1) * (max_r - min_r + 1)
|
|
total_area = self.cols * self.rows
|
|
do_full = (self._last_snapshot is None) or (changed_area > total_area * 0.35)
|
|
|
|
image = self.render_to_image()
|
|
|
|
# Apply quadrant view if active (before zoom)
|
|
if self._quadrant_view is not None:
|
|
self._safe_print(f"[DEBUG] Applying quadrant view: {self._quadrant_view}")
|
|
# Crop to the selected quadrant
|
|
half_width = image.width // 2
|
|
half_height = image.height // 2
|
|
|
|
if self._quadrant_view == 'top_left':
|
|
image = image.crop((0, 0, half_width, half_height))
|
|
elif self._quadrant_view == 'top_right':
|
|
image = image.crop((half_width, 0, image.width, half_height))
|
|
elif self._quadrant_view == 'bottom_left':
|
|
image = image.crop((0, half_height, half_width, image.height))
|
|
elif self._quadrant_view == 'bottom_right':
|
|
image = image.crop((half_width, half_height, image.width, image.height))
|
|
|
|
self._safe_print(f"[DEBUG] Cropped to size: {image.size}, scaling to display: {self.config['width']}x{self.config['height']}")
|
|
# Scale the quadrant to full display size
|
|
image = image.resize((self.config['width'], self.config['height']), Image.NEAREST)
|
|
|
|
# Apply zoom if needed (independent of quadrant view)
|
|
if self.zoom > 1.0:
|
|
new_width = int(image.width * self.zoom)
|
|
new_height = int(image.height * self.zoom)
|
|
# Use NEAREST for crisp text scaling
|
|
image = image.resize((new_width, new_height), Image.NEAREST)
|
|
# Crop to display size if zoomed image is larger
|
|
if new_width > self.config['width'] or new_height > self.config['height']:
|
|
image = image.crop((0, 0, self.config['width'], self.config['height']))
|
|
|
|
# Prefer partial window update when supported.
|
|
if (not do_full) and self._partial_enabled and self._epd_supports_partial_window():
|
|
if not self._partial_ready:
|
|
self._prepare_partial_mode_if_supported()
|
|
|
|
x_start, y_start, x_end, y_end = self._cell_bbox_to_pixel_window(dirty_cells)
|
|
|
|
# Partial refresh: EPD_4IN2_PartialDisplay inverts internally, so use raw buffer
|
|
buffer_bytes = self.epd.getbuffer(image)
|
|
|
|
with self._silence_console():
|
|
if hasattr(self.epd, 'EPD_4IN2_PartialDisplay'):
|
|
# epd4in2 expects full-frame buffer and a pixel window.
|
|
# The driver inverts and stores in DATA internally, so we don't sync here.
|
|
self.epd.EPD_4IN2_PartialDisplay(x_start, y_start, x_end, y_end, buffer_bytes)
|
|
elif hasattr(self.epd, 'display_Partial'):
|
|
# Many drivers accept a PIL Image + window. If this driver
|
|
# has a different signature, it will raise and we fall back.
|
|
try:
|
|
self.epd.display_Partial(image, x_start, y_start, x_end, y_end)
|
|
except TypeError:
|
|
# Some implementations only accept a single image.
|
|
self.epd.display_Partial(image)
|
|
# Sync for other drivers that may not maintain DATA
|
|
self._epd_sync_partial_base_from_buffer(buffer_bytes)
|
|
else:
|
|
# Full refresh: display() expects inverted polarity
|
|
image_inverted = ImageOps.invert(image.convert('L')).convert('1')
|
|
buffer_bytes = self.epd.getbuffer(image_inverted)
|
|
|
|
with self._silence_console():
|
|
self.epd.display(buffer_bytes)
|
|
self._epd_sync_partial_base_from_buffer(buffer_bytes)
|
|
|
|
# Re-enter partial mode after a full refresh if the driver supports it.
|
|
self._partial_ready = False
|
|
self._prepare_partial_mode_if_supported()
|
|
|
|
self._last_snapshot = curr_snapshot
|
|
self._last_cursor = curr_cursor
|
|
except Exception as e:
|
|
self._safe_print(f"Error updating e-ink display: {e}")
|
|
|
|
def update_display(self):
|
|
"""Schedule a debounced display update to batch rapid changes together."""
|
|
with self._update_lock:
|
|
# Cancel any pending timer
|
|
if self._update_timer is not None:
|
|
self._update_timer.cancel()
|
|
|
|
# Schedule new update after debounce period
|
|
self._update_timer = threading.Timer(
|
|
self._update_debounce_ms / 1000.0,
|
|
self._do_update_display
|
|
)
|
|
self._update_timer.daemon = True
|
|
self._update_timer.start()
|
|
|
|
def read_terminal_output(self):
|
|
"""Read output from the terminal and feed it to pyte"""
|
|
while self.running:
|
|
# Check if there's data to read
|
|
r, w, e = select.select([self.master_fd], [], [], 0.1)
|
|
if self.master_fd in r:
|
|
try:
|
|
data = os.read(self.master_fd, 1024)
|
|
if data:
|
|
# Feed data to pyte stream
|
|
self.stream.feed(data.decode('utf-8', errors='replace'))
|
|
# Update display
|
|
self.update_display()
|
|
except OSError:
|
|
break
|
|
|
|
def read_user_input(self):
|
|
"""Read user input directly from keyboard device with auto-reconnection."""
|
|
if not HAS_EVDEV:
|
|
self._safe_print("Warning: evdev not available, install with: pip3 install evdev")
|
|
return
|
|
|
|
# Proper QWERTY keyboard layout mapping based on actual evdev keycodes
|
|
key_map = {
|
|
# Special keys
|
|
ecodes.KEY_ENTER: '\r', # Send carriage return (terminal driver handles conversion)
|
|
ecodes.KEY_TAB: '\t',
|
|
ecodes.KEY_SPACE: ' ',
|
|
ecodes.KEY_BACKSPACE: '\x7f',
|
|
ecodes.KEY_ESC: '\x1b',
|
|
ecodes.KEY_MINUS: '-',
|
|
ecodes.KEY_EQUAL: '=',
|
|
ecodes.KEY_LEFTBRACE: '[',
|
|
ecodes.KEY_RIGHTBRACE: ']',
|
|
ecodes.KEY_SEMICOLON: ';',
|
|
ecodes.KEY_APOSTROPHE: "'",
|
|
ecodes.KEY_GRAVE: '`',
|
|
ecodes.KEY_BACKSLASH: '\\',
|
|
ecodes.KEY_COMMA: ',',
|
|
ecodes.KEY_DOT: '.',
|
|
ecodes.KEY_SLASH: '/',
|
|
|
|
# Arrow keys (ANSI escape sequences for terminal navigation)
|
|
ecodes.KEY_UP: '\x1b[A',
|
|
ecodes.KEY_DOWN: '\x1b[B',
|
|
ecodes.KEY_RIGHT: '\x1b[C',
|
|
ecodes.KEY_LEFT: '\x1b[D',
|
|
|
|
# Function keys (for quadrant zoom)
|
|
ecodes.KEY_F5: '<F5>',
|
|
ecodes.KEY_F6: '<F6>',
|
|
ecodes.KEY_F7: '<F7>',
|
|
ecodes.KEY_F8: '<F8>',
|
|
|
|
# Number row (unshifted)
|
|
ecodes.KEY_1: '1', ecodes.KEY_2: '2', ecodes.KEY_3: '3',
|
|
ecodes.KEY_4: '4', ecodes.KEY_5: '5', ecodes.KEY_6: '6',
|
|
ecodes.KEY_7: '7', ecodes.KEY_8: '8', ecodes.KEY_9: '9',
|
|
ecodes.KEY_0: '0',
|
|
|
|
# Letter keys (lowercase)
|
|
ecodes.KEY_Q: 'q', ecodes.KEY_W: 'w', ecodes.KEY_E: 'e', ecodes.KEY_R: 'r',
|
|
ecodes.KEY_T: 't', ecodes.KEY_Y: 'y', ecodes.KEY_U: 'u', ecodes.KEY_I: 'i',
|
|
ecodes.KEY_O: 'o', ecodes.KEY_P: 'p', ecodes.KEY_A: 'a', ecodes.KEY_S: 's',
|
|
ecodes.KEY_D: 'd', ecodes.KEY_F: 'f', ecodes.KEY_G: 'g', ecodes.KEY_H: 'h',
|
|
ecodes.KEY_J: 'j', ecodes.KEY_K: 'k', ecodes.KEY_L: 'l', ecodes.KEY_Z: 'z',
|
|
ecodes.KEY_X: 'x', ecodes.KEY_C: 'c', ecodes.KEY_V: 'v', ecodes.KEY_B: 'b',
|
|
ecodes.KEY_N: 'n', ecodes.KEY_M: 'm',
|
|
}
|
|
|
|
# Shifted character mappings
|
|
shift_map = {
|
|
'1': '!', '2': '@', '3': '#', '4': '$', '5': '%',
|
|
'6': '^', '7': '&', '8': '*', '9': '(', '0': ')',
|
|
'-': '_', '=': '+', '[': '{', ']': '}', '\\': '|',
|
|
';': ':', "'": '"', '`': '~', ',': '<', '.': '>', '/': '?',
|
|
}
|
|
|
|
shift_pressed = False
|
|
ctrl_pressed = False
|
|
last_reconnect_attempt = 0
|
|
|
|
while self.running:
|
|
# Try to connect/reconnect to keyboard
|
|
if self.keyboard_device is None:
|
|
current_time = time.time()
|
|
if current_time - last_reconnect_attempt >= self.keyboard_reconnect_interval:
|
|
last_reconnect_attempt = current_time
|
|
self.keyboard_device = self._find_keyboard_device()
|
|
if self.keyboard_device:
|
|
self.keyboard_path = self.keyboard_device.path
|
|
# Display connection status on screen
|
|
self.stream.feed(f"\r\n[Keyboard connected: {self.keyboard_device.name}]\r\n")
|
|
self.update_display()
|
|
else:
|
|
time.sleep(0.5)
|
|
continue
|
|
|
|
try:
|
|
# Check device is still valid before select
|
|
if self.keyboard_device is None:
|
|
time.sleep(0.1)
|
|
continue
|
|
|
|
# Read keyboard events with timeout
|
|
r, w, e = select.select([self.keyboard_device.fd], [], [], 0.1)
|
|
if not r:
|
|
continue
|
|
|
|
for event in self.keyboard_device.read():
|
|
if event.type == ecodes.EV_KEY:
|
|
key_event = categorize(event)
|
|
|
|
# Debug: show all key events
|
|
if event.value == 1: # Key press
|
|
key_name = ecodes.KEY[event.code] if event.code in ecodes.KEY else f"UNKNOWN({event.code})"
|
|
self._safe_print(f"[DEBUG] Key pressed: {key_name} (code: {event.code})")
|
|
|
|
# Track modifier keys
|
|
if event.code in (ecodes.KEY_LEFTSHIFT, ecodes.KEY_RIGHTSHIFT):
|
|
shift_pressed = (event.value == 1)
|
|
continue
|
|
elif event.code in (ecodes.KEY_LEFTCTRL, ecodes.KEY_RIGHTCTRL):
|
|
ctrl_pressed = (event.value == 1)
|
|
continue
|
|
|
|
# Only process key press (not release)
|
|
if event.value != 1:
|
|
continue
|
|
|
|
# Handle Ctrl+letter combinations
|
|
if ctrl_pressed:
|
|
# Map letter keys to Ctrl codes (Ctrl+A = 1, Ctrl+B = 2, ..., Ctrl+Z = 26)
|
|
ctrl_letter_map = {
|
|
ecodes.KEY_A: 1, ecodes.KEY_B: 2, ecodes.KEY_C: 3, ecodes.KEY_D: 4,
|
|
ecodes.KEY_E: 5, ecodes.KEY_F: 6, ecodes.KEY_G: 7, ecodes.KEY_H: 8,
|
|
ecodes.KEY_I: 9, ecodes.KEY_J: 10, ecodes.KEY_K: 11, ecodes.KEY_L: 12,
|
|
ecodes.KEY_M: 13, ecodes.KEY_N: 14, ecodes.KEY_O: 15, ecodes.KEY_P: 16,
|
|
ecodes.KEY_Q: 17, ecodes.KEY_R: 18, ecodes.KEY_S: 19, ecodes.KEY_T: 20,
|
|
ecodes.KEY_U: 21, ecodes.KEY_V: 22, ecodes.KEY_W: 23, ecodes.KEY_X: 24,
|
|
ecodes.KEY_Y: 25, ecodes.KEY_Z: 26,
|
|
}
|
|
|
|
if event.code in ctrl_letter_map:
|
|
ctrl_char = chr(ctrl_letter_map[event.code])
|
|
os.write(self.master_fd, ctrl_char.encode())
|
|
|
|
# Ctrl+D exits the terminal
|
|
if event.code == ecodes.KEY_D:
|
|
self.stream.feed("\r\n[Exiting...]\r\n")
|
|
self.update_display()
|
|
self.running = False
|
|
break
|
|
continue
|
|
|
|
# Handle F5-F8 for quadrant zoom
|
|
if event.code == ecodes.KEY_F5:
|
|
self._safe_print(f"[DEBUG] F5 pressed - zooming to top-left quadrant")
|
|
self._quadrant_view = 'top_left'
|
|
self._quadrant_changed = True
|
|
self.update_display()
|
|
continue
|
|
elif event.code == ecodes.KEY_F6:
|
|
self._safe_print(f"[DEBUG] F6 pressed - zooming to top-right quadrant")
|
|
self._quadrant_view = 'top_right'
|
|
self._quadrant_changed = True
|
|
self.update_display()
|
|
continue
|
|
elif event.code == ecodes.KEY_F7:
|
|
self._safe_print(f"[DEBUG] F7 pressed - zooming to bottom-left quadrant")
|
|
self._quadrant_view = 'bottom_left'
|
|
self._quadrant_changed = True
|
|
self.update_display()
|
|
continue
|
|
elif event.code == ecodes.KEY_F8:
|
|
self._safe_print(f"[DEBUG] F8 pressed - zooming to bottom-right quadrant")
|
|
self._quadrant_view = 'bottom_right'
|
|
self._quadrant_changed = True
|
|
self.update_display()
|
|
continue
|
|
|
|
# Map key to character
|
|
char = key_map.get(event.code)
|
|
if char:
|
|
# Reset quadrant view on any normal keypress
|
|
if self._quadrant_view is not None:
|
|
self._safe_print(f"[DEBUG] Resetting quadrant view to full screen")
|
|
self._quadrant_view = None
|
|
self._quadrant_changed = True
|
|
self.update_display()
|
|
# Apply shift modifier (ctrl is handled separately above)
|
|
if shift_pressed and not ctrl_pressed:
|
|
if char.isalpha():
|
|
char = char.upper()
|
|
elif char in shift_map:
|
|
char = shift_map[char]
|
|
|
|
# Send to PTY
|
|
os.write(self.master_fd, char.encode())
|
|
|
|
except (OSError, IOError) as e:
|
|
# Keyboard disconnected
|
|
if self.keyboard_device:
|
|
self._safe_print(f"Keyboard disconnected, will retry...")
|
|
self.stream.feed("\r\n[Keyboard disconnected]\r\n")
|
|
self.update_display()
|
|
try:
|
|
self.keyboard_device.close()
|
|
except:
|
|
pass
|
|
self.keyboard_device = None
|
|
time.sleep(0.5)
|
|
|
|
def spawn_shell(self, command=None):
|
|
"""Spawn a shell or run a command in the virtual terminal"""
|
|
if command is None:
|
|
command = os.environ.get('SHELL', '/bin/bash')
|
|
|
|
# Set up environment
|
|
env = os.environ.copy()
|
|
env['TERM'] = 'linux'
|
|
env['COLUMNS'] = str(self.cols)
|
|
env['LINES'] = str(self.rows)
|
|
|
|
# Create a pseudo-terminal
|
|
pid, self.master_fd = pty.fork()
|
|
|
|
if pid == 0: # Child process
|
|
# Execute the shell
|
|
os.execvpe(command, [command], env)
|
|
else: # Parent process
|
|
self.running = True
|
|
|
|
# Start reading thread for terminal output
|
|
read_thread = threading.Thread(target=self.read_terminal_output)
|
|
read_thread.daemon = True
|
|
read_thread.start()
|
|
|
|
# Start input thread for user input
|
|
input_thread = threading.Thread(target=self.read_user_input)
|
|
input_thread.daemon = True
|
|
input_thread.start()
|
|
|
|
self._safe_print(f"\nVirtual terminal started (PID: {pid})")
|
|
self._safe_print("Terminal output will be displayed above and on the e-ink display")
|
|
self._safe_print("Type commands normally - they will be sent to the virtual terminal")
|
|
self._safe_print("Press Ctrl+C or Ctrl+D to exit\n")
|
|
|
|
try:
|
|
while True:
|
|
# Check if child process is still running
|
|
try:
|
|
pid_status, status = os.waitpid(pid, os.WNOHANG)
|
|
if pid_status != 0:
|
|
break
|
|
except ChildProcessError:
|
|
break
|
|
time.sleep(0.5)
|
|
except KeyboardInterrupt:
|
|
self._safe_print("\nShutting down...")
|
|
finally:
|
|
self.running = False
|
|
# Cancel any pending update and do final refresh
|
|
with self._update_lock:
|
|
if self._update_timer is not None:
|
|
self._update_timer.cancel()
|
|
self._update_timer = None
|
|
self._do_update_display()
|
|
# Give threads time to finish
|
|
time.sleep(0.2)
|
|
os.close(self.master_fd)
|
|
|
|
if self.use_display and self.epd:
|
|
try:
|
|
self._safe_print("Putting e-ink display to sleep...")
|
|
with self._silence_console():
|
|
self.epd.sleep()
|
|
except:
|
|
pass
|
|
|
|
def run_command(self, command):
|
|
"""Run a single command and display output"""
|
|
self.running = True
|
|
|
|
try:
|
|
# Run the command and capture output
|
|
result = subprocess.run(command, shell=True, capture_output=True,
|
|
text=True, timeout=30)
|
|
|
|
# Feed output to pyte
|
|
output = result.stdout + result.stderr
|
|
self.stream.feed(output)
|
|
|
|
# Update display (force immediate for single commands)
|
|
self._do_update_display()
|
|
|
|
except subprocess.TimeoutExpired:
|
|
self._safe_print("Command timed out")
|
|
except Exception as e:
|
|
self._safe_print(f"Error running command: {e}")
|
|
finally:
|
|
self.running = False
|
|
|
|
if self.use_display and self.epd:
|
|
try:
|
|
with self._silence_console():
|
|
self.epd.sleep()
|
|
except:
|
|
pass
|
|
|
|
|
|
def main():
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(description='Virtual Terminal for E-Ink Display')
|
|
parser.add_argument('-d', '--display',
|
|
choices=list(DISPLAY_CONFIGS.keys()),
|
|
default='epd4in2',
|
|
help='E-ink display type')
|
|
parser.add_argument('-n', '--no-display',
|
|
action='store_true',
|
|
help='Preview only, do not use actual e-ink display')
|
|
parser.add_argument('-c', '--command',
|
|
help='Run a single command instead of spawning a shell')
|
|
parser.add_argument('-s', '--shell',
|
|
help='Shell to use (default: $SHELL or /bin/bash)')
|
|
parser.add_argument('-z', '--zoom',
|
|
type=float,
|
|
default=1.0,
|
|
help='Zoom multiplier for text size (e.g., 2.0 for 2x larger text)')
|
|
parser.add_argument('-p', '--no-preview',
|
|
action='store_true',
|
|
help='Disable ASCII preview in console output (useful for service mode)')
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Create virtual terminal
|
|
vt = VirtualTerminalEInk(
|
|
display_type=args.display,
|
|
use_display=not args.no_display,
|
|
zoom=args.zoom,
|
|
show_preview=not args.no_preview
|
|
)
|
|
|
|
if args.command:
|
|
# Run single command
|
|
vt.run_command(args.command)
|
|
else:
|
|
# Spawn interactive shell
|
|
shell = args.shell or os.environ.get('SHELL', '/bin/bash')
|
|
vt.spawn_shell(shell)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|