first commit
This commit is contained in:
commit
9005e49d82
3071 changed files with 1941927 additions and 0 deletions
856
virtual_terminal_eink.py
Executable file
856
virtual_terminal_eink.py
Executable file
|
|
@ -0,0 +1,856 @@
|
|||
#!/usr/bin/env python3
|
||||
# -*- coding:utf-8 -*-
|
||||
"""
|
||||
Virtual Terminal for E-Ink Display
|
||||
Spawns a virtual terminal, displays preview in console, and writes to e-ink display
|
||||
"""
|
||||
|
||||
import sys
|
||||
import os
|
||||
import time
|
||||
import threading
|
||||
import pty
|
||||
import select
|
||||
import subprocess
|
||||
import shutil
|
||||
import contextlib
|
||||
import glob
|
||||
from PIL import Image, ImageDraw, ImageFont, ImageOps
|
||||
|
||||
try:
|
||||
from evdev import InputDevice, categorize, ecodes, list_devices
|
||||
HAS_EVDEV = True
|
||||
except ImportError:
|
||||
HAS_EVDEV = False
|
||||
|
||||
# Add the waveshare library path
|
||||
libdir = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'e-Paper/RaspberryPi_JetsonNano/python/lib')
|
||||
if os.path.exists(libdir):
|
||||
sys.path.append(libdir)
|
||||
|
||||
try:
|
||||
import pyte
|
||||
except ImportError:
|
||||
print("Error: pyte library not found. Install with: pip3 install pyte")
|
||||
sys.exit(1)
|
||||
|
||||
# Configuration for different display types
|
||||
DISPLAY_CONFIGS = {
|
||||
'epd2in13_V4': {
|
||||
'width': 122,
|
||||
'height': 250,
|
||||
'module': 'epd2in13_V4',
|
||||
'char_width': 6, # Width of each character in pixels
|
||||
'char_height': 12, # Height of each character in pixels
|
||||
},
|
||||
'epd2in7': {
|
||||
'width': 176,
|
||||
'height': 264,
|
||||
'module': 'epd2in7',
|
||||
'char_width': 6,
|
||||
'char_height': 12,
|
||||
},
|
||||
'epd4in2': {
|
||||
'width': 400,
|
||||
'height': 300,
|
||||
'module': 'epd4in2',
|
||||
'char_width': 7,
|
||||
'char_height': 14,
|
||||
'font_size': 12,
|
||||
},
|
||||
'epd7in5_V2': {
|
||||
'width': 800,
|
||||
'height': 480,
|
||||
'module': 'epd7in5_V2',
|
||||
'char_width': 8,
|
||||
'char_height': 16,
|
||||
},
|
||||
}
|
||||
|
||||
class VirtualTerminalEInk:
|
||||
def __init__(self, display_type='epd2in13_V4', use_display=True, zoom=1.0):
|
||||
"""
|
||||
Initialize the virtual terminal for e-ink display
|
||||
|
||||
Args:
|
||||
display_type: Type of e-ink display (see DISPLAY_CONFIGS)
|
||||
use_display: Whether to actually use the e-ink display (False for testing)
|
||||
zoom: Zoom multiplier for rendering (1.0 = normal, 2.0 = 2x larger text, etc.)
|
||||
"""
|
||||
self.display_type = display_type
|
||||
self.use_display = use_display
|
||||
self.zoom = max(1.0, float(zoom)) # Ensure zoom is at least 1.0
|
||||
self.config = DISPLAY_CONFIGS.get(display_type)
|
||||
|
||||
# Serialize console output (preview redraw + status messages) so it doesn't
|
||||
# get interleaved with background threads or noisy hardware drivers.
|
||||
self._console_lock = threading.RLock()
|
||||
|
||||
if not self.config:
|
||||
raise ValueError(f"Unknown display type: {display_type}")
|
||||
|
||||
# Load font and measure actual character dimensions
|
||||
self.font = self._load_font()
|
||||
self._measure_font_metrics()
|
||||
|
||||
# Calculate terminal size in characters (accounting for zoom)
|
||||
# With zoom, we render to a smaller virtual terminal and scale it up
|
||||
self.cols = int(self.config['width'] / (self.char_width * self.zoom))
|
||||
self.rows = int(self.config['height'] / (self.char_height * self.zoom))
|
||||
|
||||
self._safe_print(f"Display: {display_type} ({self.config['width']}x{self.config['height']})")
|
||||
self._safe_print(f"Font metrics: {self.char_width}x{self.char_height} pixels per character")
|
||||
self._safe_print(f"Terminal size: {self.cols}x{self.rows} characters")
|
||||
|
||||
# Initialize pyte screen
|
||||
self.screen = pyte.Screen(self.cols, self.rows)
|
||||
self.stream = pyte.Stream(self.screen)
|
||||
|
||||
# Track screen deltas for partial refresh.
|
||||
self._last_snapshot = None
|
||||
self._last_cursor = None
|
||||
self._partial_ready = False
|
||||
self._partial_enabled = True
|
||||
|
||||
# Debounce display updates to batch rapid changes (typing)
|
||||
self._update_debounce_ms = 60 # milliseconds to wait before refreshing
|
||||
self._pending_update = False
|
||||
self._update_timer = None
|
||||
self._update_lock = threading.Lock()
|
||||
|
||||
# Initialize e-ink display
|
||||
self.epd = None
|
||||
if self.use_display:
|
||||
try:
|
||||
module = __import__(f'waveshare_epd.{self.config["module"]}', fromlist=[self.config['module']])
|
||||
self.epd = module.EPD()
|
||||
self._safe_print("Initializing e-ink display...")
|
||||
with self._silence_console():
|
||||
self.epd.init()
|
||||
# Call Clear without arguments for displays that don't take parameters
|
||||
try:
|
||||
self.epd.Clear(0xFF)
|
||||
except TypeError:
|
||||
self.epd.Clear()
|
||||
self._safe_print("E-ink display ready!")
|
||||
|
||||
# If supported, switch the panel into partial-update mode so we can
|
||||
# update only the changed region (e.g., per typed character).
|
||||
self._prepare_partial_mode_if_supported()
|
||||
except Exception as e:
|
||||
self._safe_print(f"Warning: Could not initialize e-ink display: {e}")
|
||||
self._safe_print("Running in preview-only mode")
|
||||
self.use_display = False
|
||||
|
||||
# Terminal process
|
||||
self.master_fd = None
|
||||
self.running = False
|
||||
|
||||
# Direct keyboard input (headless mode)
|
||||
self.keyboard_device = None
|
||||
self.keyboard_path = None
|
||||
self.keyboard_reconnect_interval = 2.0 # seconds
|
||||
|
||||
def _safe_print(self, *args, **kwargs):
|
||||
# When the terminal is put into raw-ish modes (see read_user_input), some
|
||||
# terminals stop translating '\n' into CRLF. Using CRLF avoids "stair-step"
|
||||
# indentation in the preview.
|
||||
with self._console_lock:
|
||||
end = kwargs.pop('end', '\r\n')
|
||||
flush = kwargs.pop('flush', True)
|
||||
print(*args, end=end, flush=flush, **kwargs)
|
||||
|
||||
@contextlib.contextmanager
|
||||
def _silence_console(self):
|
||||
"""Temporarily silence stdout/stderr (e.g., Waveshare driver prints)."""
|
||||
with open(os.devnull, 'w') as devnull:
|
||||
with contextlib.redirect_stdout(devnull), contextlib.redirect_stderr(devnull):
|
||||
yield
|
||||
|
||||
def _epd_supports_partial_window(self):
|
||||
"""Return True if the active driver exposes a windowed partial refresh API."""
|
||||
if not self.epd:
|
||||
return False
|
||||
|
||||
# epd4in2 exposes a dedicated region updater.
|
||||
if hasattr(self.epd, 'EPD_4IN2_PartialDisplay'):
|
||||
return True
|
||||
|
||||
# Many other drivers expose display_Partial(Image, Xstart, Ystart, Xend, Yend)
|
||||
if hasattr(self.epd, 'display_Partial'):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def _prepare_partial_mode_if_supported(self):
|
||||
if not (self.use_display and self.epd and self._partial_enabled):
|
||||
return
|
||||
if not self._epd_supports_partial_window():
|
||||
return
|
||||
|
||||
# Some drivers require switching LUT/mode via init_Partial.
|
||||
if hasattr(self.epd, 'init_Partial'):
|
||||
try:
|
||||
with self._silence_console():
|
||||
self.epd.init_Partial()
|
||||
self._partial_ready = True
|
||||
except Exception:
|
||||
self._partial_ready = False
|
||||
|
||||
def _snapshot_screen(self):
|
||||
"""Return a compact snapshot of the current pyte screen state."""
|
||||
snapshot = []
|
||||
buf = self.screen.buffer
|
||||
for row in range(self.rows):
|
||||
row_cells = []
|
||||
line = buf[row]
|
||||
for col in range(self.cols):
|
||||
ch = line[col]
|
||||
row_cells.append((ch.data, bool(getattr(ch, 'reverse', False))))
|
||||
snapshot.append(row_cells)
|
||||
return snapshot
|
||||
|
||||
def _compute_dirty_cell_bbox(self, prev_snapshot, curr_snapshot, prev_cursor, curr_cursor):
|
||||
"""Compute bounding box of changed cells (inclusive). Returns (min_c, min_r, max_c, max_r) or None."""
|
||||
if prev_snapshot is None:
|
||||
return (0, 0, self.cols - 1, self.rows - 1)
|
||||
|
||||
min_c = None
|
||||
min_r = None
|
||||
max_c = None
|
||||
max_r = None
|
||||
|
||||
for r in range(self.rows):
|
||||
prev_row = prev_snapshot[r]
|
||||
curr_row = curr_snapshot[r]
|
||||
for c in range(self.cols):
|
||||
if prev_row[c] != curr_row[c]:
|
||||
if min_c is None:
|
||||
min_c = max_c = c
|
||||
min_r = max_r = r
|
||||
else:
|
||||
min_c = min(min_c, c)
|
||||
max_c = max(max_c, c)
|
||||
min_r = min(min_r, r)
|
||||
max_r = max(max_r, r)
|
||||
|
||||
# Always refresh old + new cursor cells so underline moves cleanly.
|
||||
for cur in (prev_cursor, curr_cursor):
|
||||
if not cur:
|
||||
continue
|
||||
c, r = cur
|
||||
if 0 <= c < self.cols and 0 <= r < self.rows:
|
||||
if min_c is None:
|
||||
min_c = max_c = c
|
||||
min_r = max_r = r
|
||||
else:
|
||||
min_c = min(min_c, c)
|
||||
max_c = max(max_c, c)
|
||||
min_r = min(min_r, r)
|
||||
max_r = max(max_r, r)
|
||||
|
||||
if min_c is None:
|
||||
return None
|
||||
return (min_c, min_r, max_c, max_r)
|
||||
|
||||
def _cell_bbox_to_pixel_window(self, cell_bbox):
|
||||
"""Convert cell bbox to pixel window (x_start, y_start, x_end, y_end), end-exclusive."""
|
||||
min_c, min_r, max_c, max_r = cell_bbox
|
||||
x_start = min_c * self.char_width
|
||||
y_start = min_r * self.char_height
|
||||
x_end = (max_c + 1) * self.char_width
|
||||
y_end = (max_r + 1) * self.char_height
|
||||
|
||||
# Expand Y slightly to ensure cursor underline and descenders are fully captured
|
||||
y_end = min(y_end + 2, self.config['height'])
|
||||
|
||||
# Clamp to display bounds.
|
||||
x_start = max(0, min(x_start, self.config['width']))
|
||||
y_start = max(0, min(y_start, self.config['height']))
|
||||
x_end = max(0, min(x_end, self.config['width']))
|
||||
y_end = max(0, min(y_end, self.config['height']))
|
||||
|
||||
# Align X to byte boundaries (8px) for most controllers.
|
||||
x_start = (x_start // 8) * 8
|
||||
x_end = ((x_end + 7) // 8) * 8
|
||||
x_end = max(x_start + 8, min(x_end, self.config['width']))
|
||||
|
||||
# Ensure non-empty window.
|
||||
if y_end <= y_start:
|
||||
y_end = min(y_start + self.char_height, self.config['height'])
|
||||
|
||||
return (x_start, y_start, x_end, y_end)
|
||||
|
||||
def _find_keyboard_device(self):
|
||||
"""Find a keyboard input device, preferring Bluetooth keyboards."""
|
||||
if not HAS_EVDEV:
|
||||
return None
|
||||
|
||||
try:
|
||||
devices = [InputDevice(path) for path in list_devices()]
|
||||
keyboards = []
|
||||
|
||||
for device in devices:
|
||||
# Check if device has keyboard capabilities
|
||||
caps = device.capabilities()
|
||||
if ecodes.EV_KEY in caps:
|
||||
# Check for typical keyboard keys
|
||||
keys = caps[ecodes.EV_KEY]
|
||||
has_letters = any(k in keys for k in range(ecodes.KEY_Q, ecodes.KEY_P + 1))
|
||||
if has_letters:
|
||||
# Prefer Bluetooth keyboards
|
||||
is_bluetooth = 'bluetooth' in device.name.lower() or 'bt' in device.name.lower()
|
||||
keyboards.append((device, is_bluetooth))
|
||||
|
||||
if not keyboards:
|
||||
return None
|
||||
|
||||
# Sort by Bluetooth preference
|
||||
keyboards.sort(key=lambda x: x[1], reverse=True)
|
||||
selected = keyboards[0][0]
|
||||
self._safe_print(f"Found keyboard: {selected.name} ({selected.path})")
|
||||
return selected
|
||||
except Exception as e:
|
||||
self._safe_print(f"Error finding keyboard: {e}")
|
||||
return None
|
||||
|
||||
def _epd_sync_partial_base_from_buffer(self, buffer_bytes):
|
||||
"""Keep driver-side 'old image' buffer in sync for partial updates."""
|
||||
if not self.epd or not hasattr(self.epd, 'DATA'):
|
||||
return
|
||||
try:
|
||||
# The driver inverts the buffer itself during partial refresh,
|
||||
# so we store it as-is (not inverted).
|
||||
self.epd.DATA = list(buffer_bytes)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def _load_font(self):
|
||||
"""Load a monospace font"""
|
||||
font_size = self.config.get('font_size', self.config['char_height'] - 2)
|
||||
|
||||
# Try to use a monospace font, fall back to default if not available
|
||||
for font_path in [
|
||||
'/usr/share/fonts/truetype/dejavu/DejaVuSansMono.ttf',
|
||||
'/usr/share/fonts/truetype/liberation/LiberationMono-Regular.ttf',
|
||||
'/usr/share/fonts/truetype/freefont/FreeMono.ttf',
|
||||
]:
|
||||
try:
|
||||
return ImageFont.truetype(font_path, font_size)
|
||||
except:
|
||||
continue
|
||||
|
||||
return ImageFont.load_default()
|
||||
|
||||
def _measure_font_metrics(self):
|
||||
"""Measure actual character dimensions from the loaded font"""
|
||||
# Create a temporary image to measure text
|
||||
temp_img = Image.new('1', (200, 100), 255)
|
||||
draw = ImageDraw.Draw(temp_img)
|
||||
|
||||
# Measure multiple characters to get accurate width
|
||||
test_str = "MMMMMMMMMM" # 10 M's
|
||||
bbox = draw.textbbox((0, 0), test_str, font=self.font)
|
||||
avg_width = (bbox[2] - bbox[0]) / 10
|
||||
|
||||
# Measure height
|
||||
bbox_height = draw.textbbox((0, 0), 'M', font=self.font)
|
||||
height = bbox_height[3] - bbox_height[1]
|
||||
|
||||
# Round up to ensure characters don't overlap
|
||||
self.char_width = int(avg_width) + 1
|
||||
self.char_height = int(height) + 2
|
||||
|
||||
# Ensure minimum dimensions
|
||||
if self.char_width < 6:
|
||||
self.char_width = 6
|
||||
if self.char_height < 10:
|
||||
self.char_height = 10
|
||||
|
||||
self._safe_print(f"DEBUG: Measured char dimensions - width: {self.char_width}, height: {self.char_height}")
|
||||
|
||||
def render_to_image(self):
|
||||
"""Render the virtual terminal screen to a PIL Image"""
|
||||
# Create a white image
|
||||
image = Image.new('1', (self.config['width'], self.config['height']), 255)
|
||||
draw = ImageDraw.Draw(image)
|
||||
|
||||
# Draw each character
|
||||
for row in range(self.rows):
|
||||
for col in range(self.cols):
|
||||
char = self.screen.buffer[row][col]
|
||||
if char.data != ' ':
|
||||
x = col * self.char_width
|
||||
y = row * self.char_height
|
||||
|
||||
# Draw character (0 = black, 255 = white)
|
||||
if char.reverse:
|
||||
# Inverted colors for reverse video
|
||||
draw.rectangle([(x, y), (x + self.char_width,
|
||||
y + self.char_height)], fill=0)
|
||||
draw.text((x, y), char.data, font=self.font, fill=255)
|
||||
else:
|
||||
draw.text((x, y), char.data, font=self.font, fill=0)
|
||||
|
||||
# Draw cursor
|
||||
cursor_row = self.screen.cursor.y
|
||||
cursor_col = self.screen.cursor.x
|
||||
cursor_x = cursor_col * self.char_width
|
||||
cursor_y = cursor_row * self.char_height
|
||||
draw.rectangle([(cursor_x, cursor_y + self.char_height - 2),
|
||||
(cursor_x + self.char_width, cursor_y + self.char_height)],
|
||||
fill=0)
|
||||
|
||||
return image
|
||||
|
||||
def display_preview(self):
|
||||
"""Display ASCII preview of the terminal in the console"""
|
||||
with self._console_lock:
|
||||
term_size = shutil.get_terminal_size(fallback=(80, 24))
|
||||
term_cols = max(20, term_size.columns)
|
||||
term_lines = max(10, term_size.lines)
|
||||
|
||||
# Keep a small bottom area for status lines.
|
||||
max_visible_rows = max(1, term_lines - 4)
|
||||
visible_rows = min(self.rows, max_visible_rows)
|
||||
|
||||
# Clamp preview width to terminal width so the border never wraps.
|
||||
max_visible_cols = max(10, term_cols - 2)
|
||||
visible_cols = min(self.cols, max_visible_cols)
|
||||
|
||||
cursor_col = self.screen.cursor.x
|
||||
cursor_row = self.screen.cursor.y
|
||||
|
||||
# If the virtual terminal is wider than our real terminal, horizontally scroll
|
||||
# so the cursor stays visible.
|
||||
if visible_cols < self.cols:
|
||||
start_col = cursor_col - (visible_cols // 2)
|
||||
start_col = max(0, min(start_col, self.cols - visible_cols))
|
||||
else:
|
||||
start_col = 0
|
||||
|
||||
# Clear screen and redraw in-place.
|
||||
sys.stdout.write("\x1b[2J\x1b[H")
|
||||
|
||||
border = "=" * (visible_cols + 2)
|
||||
sys.stdout.write(border + "\r\n")
|
||||
|
||||
for row in range(visible_rows):
|
||||
# Some pyte Line implementations support slicing; if not, fall back.
|
||||
try:
|
||||
row_slice = self.screen.buffer[row][start_col:start_col + visible_cols]
|
||||
line = "".join(ch.data for ch in row_slice)
|
||||
except Exception:
|
||||
line_chars = []
|
||||
for col in range(start_col, start_col + visible_cols):
|
||||
line_chars.append(self.screen.buffer[row][col].data)
|
||||
line = "".join(line_chars)
|
||||
|
||||
# Pad to keep the right border aligned.
|
||||
if len(line) < visible_cols:
|
||||
line = line.ljust(visible_cols)
|
||||
sys.stdout.write("|" + line + "|\r\n")
|
||||
|
||||
sys.stdout.write(border + "\r\n")
|
||||
if start_col:
|
||||
sys.stdout.write(
|
||||
f"Cursor at: ({cursor_col}, {cursor_row}) view: cols {start_col}-{start_col + visible_cols - 1}\r\n"
|
||||
)
|
||||
else:
|
||||
sys.stdout.write(f"Cursor at: ({cursor_col}, {cursor_row})\r\n")
|
||||
|
||||
sys.stdout.flush()
|
||||
|
||||
def _do_update_display(self):
|
||||
"""Internal: Actually perform the display update (called after debounce)."""
|
||||
with self._console_lock:
|
||||
# Display preview in console
|
||||
self.display_preview()
|
||||
|
||||
# Update e-ink display (silence driver prints to avoid tearing the preview)
|
||||
if self.use_display and self.epd:
|
||||
try:
|
||||
curr_snapshot = self._snapshot_screen()
|
||||
curr_cursor = (self.screen.cursor.x, self.screen.cursor.y)
|
||||
dirty_cells = self._compute_dirty_cell_bbox(
|
||||
self._last_snapshot,
|
||||
curr_snapshot,
|
||||
self._last_cursor,
|
||||
curr_cursor,
|
||||
)
|
||||
|
||||
# Nothing changed -> skip panel update.
|
||||
if dirty_cells is None:
|
||||
self._last_snapshot = curr_snapshot
|
||||
self._last_cursor = curr_cursor
|
||||
return
|
||||
|
||||
# Heuristic: large changes -> full refresh.
|
||||
min_c, min_r, max_c, max_r = dirty_cells
|
||||
changed_area = (max_c - min_c + 1) * (max_r - min_r + 1)
|
||||
total_area = self.cols * self.rows
|
||||
do_full = (self._last_snapshot is None) or (changed_area > total_area * 0.35)
|
||||
|
||||
image = self.render_to_image()
|
||||
|
||||
# Apply zoom if needed
|
||||
if self.zoom > 1.0:
|
||||
new_width = int(image.width * self.zoom)
|
||||
new_height = int(image.height * self.zoom)
|
||||
# Use NEAREST for crisp text scaling
|
||||
image = image.resize((new_width, new_height), Image.NEAREST)
|
||||
# Crop to display size if zoomed image is larger
|
||||
if new_width > self.config['width'] or new_height > self.config['height']:
|
||||
image = image.crop((0, 0, self.config['width'], self.config['height']))
|
||||
|
||||
# Prefer partial window update when supported.
|
||||
if (not do_full) and self._partial_enabled and self._epd_supports_partial_window():
|
||||
if not self._partial_ready:
|
||||
self._prepare_partial_mode_if_supported()
|
||||
|
||||
x_start, y_start, x_end, y_end = self._cell_bbox_to_pixel_window(dirty_cells)
|
||||
|
||||
# Partial refresh: EPD_4IN2_PartialDisplay inverts internally, so use raw buffer
|
||||
buffer_bytes = self.epd.getbuffer(image)
|
||||
|
||||
with self._silence_console():
|
||||
if hasattr(self.epd, 'EPD_4IN2_PartialDisplay'):
|
||||
# epd4in2 expects full-frame buffer and a pixel window.
|
||||
# The driver inverts and stores in DATA internally, so we don't sync here.
|
||||
self.epd.EPD_4IN2_PartialDisplay(x_start, y_start, x_end, y_end, buffer_bytes)
|
||||
elif hasattr(self.epd, 'display_Partial'):
|
||||
# Many drivers accept a PIL Image + window. If this driver
|
||||
# has a different signature, it will raise and we fall back.
|
||||
try:
|
||||
self.epd.display_Partial(image, x_start, y_start, x_end, y_end)
|
||||
except TypeError:
|
||||
# Some implementations only accept a single image.
|
||||
self.epd.display_Partial(image)
|
||||
# Sync for other drivers that may not maintain DATA
|
||||
self._epd_sync_partial_base_from_buffer(buffer_bytes)
|
||||
else:
|
||||
# Full refresh: display() expects inverted polarity
|
||||
image_inverted = ImageOps.invert(image.convert('L')).convert('1')
|
||||
buffer_bytes = self.epd.getbuffer(image_inverted)
|
||||
|
||||
with self._silence_console():
|
||||
self.epd.display(buffer_bytes)
|
||||
self._epd_sync_partial_base_from_buffer(buffer_bytes)
|
||||
|
||||
# Re-enter partial mode after a full refresh if the driver supports it.
|
||||
self._partial_ready = False
|
||||
self._prepare_partial_mode_if_supported()
|
||||
|
||||
self._last_snapshot = curr_snapshot
|
||||
self._last_cursor = curr_cursor
|
||||
except Exception as e:
|
||||
self._safe_print(f"Error updating e-ink display: {e}")
|
||||
|
||||
def update_display(self):
|
||||
"""Schedule a debounced display update to batch rapid changes together."""
|
||||
with self._update_lock:
|
||||
# Cancel any pending timer
|
||||
if self._update_timer is not None:
|
||||
self._update_timer.cancel()
|
||||
|
||||
# Schedule new update after debounce period
|
||||
self._update_timer = threading.Timer(
|
||||
self._update_debounce_ms / 1000.0,
|
||||
self._do_update_display
|
||||
)
|
||||
self._update_timer.daemon = True
|
||||
self._update_timer.start()
|
||||
|
||||
def read_terminal_output(self):
|
||||
"""Read output from the terminal and feed it to pyte"""
|
||||
while self.running:
|
||||
# Check if there's data to read
|
||||
r, w, e = select.select([self.master_fd], [], [], 0.1)
|
||||
if self.master_fd in r:
|
||||
try:
|
||||
data = os.read(self.master_fd, 1024)
|
||||
if data:
|
||||
# Feed data to pyte stream
|
||||
self.stream.feed(data.decode('utf-8', errors='replace'))
|
||||
# Update display
|
||||
self.update_display()
|
||||
except OSError:
|
||||
break
|
||||
|
||||
def read_user_input(self):
|
||||
"""Read user input directly from keyboard device with auto-reconnection."""
|
||||
if not HAS_EVDEV:
|
||||
self._safe_print("Warning: evdev not available, install with: pip3 install evdev")
|
||||
return
|
||||
|
||||
# Proper QWERTY keyboard layout mapping based on actual evdev keycodes
|
||||
key_map = {
|
||||
# Special keys
|
||||
ecodes.KEY_ENTER: '\n',
|
||||
ecodes.KEY_TAB: '\t',
|
||||
ecodes.KEY_SPACE: ' ',
|
||||
ecodes.KEY_BACKSPACE: '\x7f',
|
||||
ecodes.KEY_ESC: '\x1b',
|
||||
ecodes.KEY_MINUS: '-',
|
||||
ecodes.KEY_EQUAL: '=',
|
||||
ecodes.KEY_LEFTBRACE: '[',
|
||||
ecodes.KEY_RIGHTBRACE: ']',
|
||||
ecodes.KEY_SEMICOLON: ';',
|
||||
ecodes.KEY_APOSTROPHE: "'",
|
||||
ecodes.KEY_GRAVE: '`',
|
||||
ecodes.KEY_BACKSLASH: '\\',
|
||||
ecodes.KEY_COMMA: ',',
|
||||
ecodes.KEY_DOT: '.',
|
||||
ecodes.KEY_SLASH: '/',
|
||||
|
||||
# Number row (unshifted)
|
||||
ecodes.KEY_1: '1', ecodes.KEY_2: '2', ecodes.KEY_3: '3',
|
||||
ecodes.KEY_4: '4', ecodes.KEY_5: '5', ecodes.KEY_6: '6',
|
||||
ecodes.KEY_7: '7', ecodes.KEY_8: '8', ecodes.KEY_9: '9',
|
||||
ecodes.KEY_0: '0',
|
||||
|
||||
# Letter keys (lowercase)
|
||||
ecodes.KEY_Q: 'q', ecodes.KEY_W: 'w', ecodes.KEY_E: 'e', ecodes.KEY_R: 'r',
|
||||
ecodes.KEY_T: 't', ecodes.KEY_Y: 'y', ecodes.KEY_U: 'u', ecodes.KEY_I: 'i',
|
||||
ecodes.KEY_O: 'o', ecodes.KEY_P: 'p', ecodes.KEY_A: 'a', ecodes.KEY_S: 's',
|
||||
ecodes.KEY_D: 'd', ecodes.KEY_F: 'f', ecodes.KEY_G: 'g', ecodes.KEY_H: 'h',
|
||||
ecodes.KEY_J: 'j', ecodes.KEY_K: 'k', ecodes.KEY_L: 'l', ecodes.KEY_Z: 'z',
|
||||
ecodes.KEY_X: 'x', ecodes.KEY_C: 'c', ecodes.KEY_V: 'v', ecodes.KEY_B: 'b',
|
||||
ecodes.KEY_N: 'n', ecodes.KEY_M: 'm',
|
||||
}
|
||||
|
||||
# Shifted character mappings
|
||||
shift_map = {
|
||||
'1': '!', '2': '@', '3': '#', '4': '$', '5': '%',
|
||||
'6': '^', '7': '&', '8': '*', '9': '(', '0': ')',
|
||||
'-': '_', '=': '+', '[': '{', ']': '}', '\\': '|',
|
||||
';': ':', "'": '"', '`': '~', ',': '<', '.': '>', '/': '?',
|
||||
}
|
||||
|
||||
shift_pressed = False
|
||||
ctrl_pressed = False
|
||||
last_reconnect_attempt = 0
|
||||
|
||||
while self.running:
|
||||
# Try to connect/reconnect to keyboard
|
||||
if self.keyboard_device is None:
|
||||
current_time = time.time()
|
||||
if current_time - last_reconnect_attempt >= self.keyboard_reconnect_interval:
|
||||
last_reconnect_attempt = current_time
|
||||
self.keyboard_device = self._find_keyboard_device()
|
||||
if self.keyboard_device:
|
||||
self.keyboard_path = self.keyboard_device.path
|
||||
# Display connection status on screen
|
||||
self.stream.feed(f"\r\n[Keyboard connected: {self.keyboard_device.name}]\r\n")
|
||||
self.update_display()
|
||||
else:
|
||||
time.sleep(0.5)
|
||||
continue
|
||||
|
||||
try:
|
||||
# Check device is still valid before select
|
||||
if self.keyboard_device is None:
|
||||
time.sleep(0.1)
|
||||
continue
|
||||
|
||||
# Read keyboard events with timeout
|
||||
r, w, e = select.select([self.keyboard_device.fd], [], [], 0.1)
|
||||
if not r:
|
||||
continue
|
||||
|
||||
for event in self.keyboard_device.read():
|
||||
if event.type == ecodes.EV_KEY:
|
||||
key_event = categorize(event)
|
||||
|
||||
# Track modifier keys
|
||||
if event.code in (ecodes.KEY_LEFTSHIFT, ecodes.KEY_RIGHTSHIFT):
|
||||
shift_pressed = (event.value == 1)
|
||||
continue
|
||||
elif event.code in (ecodes.KEY_LEFTCTRL, ecodes.KEY_RIGHTCTRL):
|
||||
ctrl_pressed = (event.value == 1)
|
||||
continue
|
||||
|
||||
# Only process key press (not release)
|
||||
if event.value != 1:
|
||||
continue
|
||||
|
||||
# Handle Ctrl+D to exit
|
||||
if ctrl_pressed and event.code == ecodes.KEY_D:
|
||||
self.stream.feed("\r\n[Exiting...]\r\n")
|
||||
self.update_display()
|
||||
self.running = False
|
||||
break
|
||||
|
||||
# Handle Ctrl+C
|
||||
if ctrl_pressed and event.code == ecodes.KEY_C:
|
||||
os.write(self.master_fd, b'\x03')
|
||||
continue
|
||||
|
||||
# Map key to character
|
||||
char = key_map.get(event.code)
|
||||
if char:
|
||||
# Apply shift modifier
|
||||
if shift_pressed:
|
||||
if char.isalpha():
|
||||
char = char.upper()
|
||||
elif char in shift_map:
|
||||
char = shift_map[char]
|
||||
|
||||
# Apply ctrl modifier (for letters only)
|
||||
if ctrl_pressed and char.isalpha():
|
||||
char = chr(ord(char.upper()) - ord('A') + 1)
|
||||
|
||||
# Send to PTY
|
||||
os.write(self.master_fd, char.encode())
|
||||
|
||||
except (OSError, IOError) as e:
|
||||
# Keyboard disconnected
|
||||
if self.keyboard_device:
|
||||
self._safe_print(f"Keyboard disconnected, will retry...")
|
||||
self.stream.feed("\r\n[Keyboard disconnected]\r\n")
|
||||
self.update_display()
|
||||
try:
|
||||
self.keyboard_device.close()
|
||||
except:
|
||||
pass
|
||||
self.keyboard_device = None
|
||||
time.sleep(0.5)
|
||||
|
||||
def spawn_shell(self, command=None):
|
||||
"""Spawn a shell or run a command in the virtual terminal"""
|
||||
if command is None:
|
||||
command = os.environ.get('SHELL', '/bin/bash')
|
||||
|
||||
# Set up environment
|
||||
env = os.environ.copy()
|
||||
env['TERM'] = 'linux'
|
||||
env['COLUMNS'] = str(self.cols)
|
||||
env['LINES'] = str(self.rows)
|
||||
|
||||
# Create a pseudo-terminal
|
||||
pid, self.master_fd = pty.fork()
|
||||
|
||||
if pid == 0: # Child process
|
||||
# Execute the shell
|
||||
os.execvpe(command, [command], env)
|
||||
else: # Parent process
|
||||
self.running = True
|
||||
|
||||
# Start reading thread for terminal output
|
||||
read_thread = threading.Thread(target=self.read_terminal_output)
|
||||
read_thread.daemon = True
|
||||
read_thread.start()
|
||||
|
||||
# Start input thread for user input
|
||||
input_thread = threading.Thread(target=self.read_user_input)
|
||||
input_thread.daemon = True
|
||||
input_thread.start()
|
||||
|
||||
self._safe_print(f"\nVirtual terminal started (PID: {pid})")
|
||||
self._safe_print("Terminal output will be displayed above and on the e-ink display")
|
||||
self._safe_print("Type commands normally - they will be sent to the virtual terminal")
|
||||
self._safe_print("Press Ctrl+C or Ctrl+D to exit\n")
|
||||
|
||||
try:
|
||||
while True:
|
||||
# Check if child process is still running
|
||||
try:
|
||||
pid_status, status = os.waitpid(pid, os.WNOHANG)
|
||||
if pid_status != 0:
|
||||
break
|
||||
except ChildProcessError:
|
||||
break
|
||||
time.sleep(0.5)
|
||||
except KeyboardInterrupt:
|
||||
self._safe_print("\nShutting down...")
|
||||
finally:
|
||||
self.running = False
|
||||
# Cancel any pending update and do final refresh
|
||||
with self._update_lock:
|
||||
if self._update_timer is not None:
|
||||
self._update_timer.cancel()
|
||||
self._update_timer = None
|
||||
self._do_update_display()
|
||||
# Give threads time to finish
|
||||
time.sleep(0.2)
|
||||
os.close(self.master_fd)
|
||||
|
||||
if self.use_display and self.epd:
|
||||
try:
|
||||
self._safe_print("Putting e-ink display to sleep...")
|
||||
with self._silence_console():
|
||||
self.epd.sleep()
|
||||
except:
|
||||
pass
|
||||
|
||||
def run_command(self, command):
|
||||
"""Run a single command and display output"""
|
||||
self.running = True
|
||||
|
||||
try:
|
||||
# Run the command and capture output
|
||||
result = subprocess.run(command, shell=True, capture_output=True,
|
||||
text=True, timeout=30)
|
||||
|
||||
# Feed output to pyte
|
||||
output = result.stdout + result.stderr
|
||||
self.stream.feed(output)
|
||||
|
||||
# Update display (force immediate for single commands)
|
||||
self._do_update_display()
|
||||
|
||||
except subprocess.TimeoutExpired:
|
||||
self._safe_print("Command timed out")
|
||||
except Exception as e:
|
||||
self._safe_print(f"Error running command: {e}")
|
||||
finally:
|
||||
self.running = False
|
||||
|
||||
if self.use_display and self.epd:
|
||||
try:
|
||||
with self._silence_console():
|
||||
self.epd.sleep()
|
||||
except:
|
||||
pass
|
||||
|
||||
|
||||
def main():
|
||||
import argparse
|
||||
|
||||
parser = argparse.ArgumentParser(description='Virtual Terminal for E-Ink Display')
|
||||
parser.add_argument('-d', '--display',
|
||||
choices=list(DISPLAY_CONFIGS.keys()),
|
||||
default='epd4in2',
|
||||
help='E-ink display type')
|
||||
parser.add_argument('-n', '--no-display',
|
||||
action='store_true',
|
||||
help='Preview only, do not use actual e-ink display')
|
||||
parser.add_argument('-c', '--command',
|
||||
help='Run a single command instead of spawning a shell')
|
||||
parser.add_argument('-s', '--shell',
|
||||
help='Shell to use (default: $SHELL or /bin/bash)')
|
||||
parser.add_argument('-z', '--zoom',
|
||||
type=float,
|
||||
default=1.0,
|
||||
help='Zoom multiplier for text size (e.g., 2.0 for 2x larger text)')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# Create virtual terminal
|
||||
vt = VirtualTerminalEInk(
|
||||
display_type=args.display,
|
||||
use_display=not args.no_display,
|
||||
zoom=args.zoom
|
||||
)
|
||||
|
||||
if args.command:
|
||||
# Run single command
|
||||
vt.run_command(args.command)
|
||||
else:
|
||||
# Spawn interactive shell
|
||||
shell = args.shell or os.environ.get('SHELL', '/bin/bash')
|
||||
vt.spawn_shell(shell)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
Loading…
Add table
Add a link
Reference in a new issue