#!/usr/bin/env python3 """ Poll Proton Bridge via IMAP and inject new messages into a local SMTP server. - Assumes Proton Bridge IMAP is running locally. - Assumes an SMTP server is listening on a local port (e.g. 2525). """ import imaplib import json import logging import os import smtplib import ssl import sys import time import atexit import errno from dataclasses import dataclass from email import message_from_bytes from email.message import Message from email.utils import getaddresses, parseaddr from typing import Iterable, List, Optional LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper() logging.basicConfig( level=LOG_LEVEL, format="[email_tunnel] %(asctime)s %(levelname)s %(message)s", ) logger = logging.getLogger("email_tunnel") PIDFILE_PATH = os.getenv("PIDFILE", "/tmp/email_tunnel.pid") @dataclass(frozen=True) class Config: imap_host: str imap_port: int imap_user: str imap_password: str imap_folder: str imap_tls_verify: bool smtp_host: str smtp_port: int poll_interval: int state_file: str @classmethod def from_env(cls) -> "Config": missing: List[str] = [] def require(name: str) -> str: value = os.getenv(name) if not value: missing.append(name) return value or "" imap_host = require("IMAP_IP") imap_port_raw = require("IMAP_PORT") smtp_host = require("SMTP_IP") smtp_port_raw = require("SMTP_PORT") poll_interval_raw = require("POLL_INTERVAL") imap_user = require("IMAP_USER") imap_password = require("IMAP_PASSWORD") imap_folder = require("IMAP_FOLDER") imap_tls_verify_raw = require("IMAP_TLS_VERIFY") if missing: raise ValueError(f"Missing required environment variables: {', '.join(missing)}") try: imap_port = int(imap_port_raw) except ValueError: raise ValueError("IMAP_PORT must be an integer") try: smtp_port = int(smtp_port_raw) except ValueError: raise ValueError("SMTP_PORT must be an integer") try: poll_interval = int(poll_interval_raw) except ValueError: raise ValueError("POLL_INTERVAL must be an integer") if len(imap_host) == 0: raise ValueError("IMAP_IP must be a valid string") if imap_port <= 0: raise ValueError("IMAP_PORT must be a positive integer") if smtp_port <= 0: raise ValueError("SMTP_PORT must be a positive integer") if poll_interval <= 0: raise ValueError("POLL_INTERVAL must be a positive integer") if len(imap_user) == 0: raise ValueError("IMAP_USER must be a valid string") if len(imap_password) == 0: raise ValueError("IMAP_PASSWORD must be a valid string") if len(imap_folder) == 0: raise ValueError("IMAP_FOLDER must be a valid string") return cls( imap_host=imap_host, imap_port=imap_port, imap_user=imap_user, imap_password=imap_password, imap_folder=imap_folder, imap_tls_verify=imap_tls_verify_raw not in ("0", "false", "False"), smtp_host=smtp_host, smtp_port=smtp_port, poll_interval=poll_interval, state_file=os.getenv("STATE_FILE", "./state.json"), ) class FatalConfigError(RuntimeError): """Raised for configuration problems that require immediate exit.""" class DeliveryError(RuntimeError): """Raised when a message cannot be delivered after retries.""" def ensure_single_instance() -> None: """ Enforce single instance via pidfile. """ pid = os.getpid() _write_pidfile(PIDFILE_PATH, pid) def _pid_running(check_pid: int) -> bool: if check_pid <= 0: return False try: os.kill(check_pid, 0) except OSError as exc: if exc.errno in (errno.ESRCH, errno.EINVAL): return False return True else: return True def _write_pidfile(path: str, pid: int) -> None: try: fd = os.open(path, os.O_CREAT | os.O_EXCL | os.O_WRONLY) with os.fdopen(fd, "w") as f: f.write(str(pid)) atexit.register(lambda: _remove_pidfile(path)) logger.info("Created pidfile at %s", path) except FileExistsError: try: with open(path, "r") as f: existing_pid = int(f.read().strip() or 0) except Exception: existing_pid = 0 if existing_pid > 0 and _pid_running(existing_pid): logger.error( "Pidfile %s exists and process %s is running; exiting.", path, existing_pid ) sys.exit(1) else: try: os.remove(path) logger.warning("Stale pidfile %s found; replacing.", path) _write_pidfile(path, pid) return except Exception as exc: logger.error("Failed to replace stale pidfile %s: %s", path, exc) sys.exit(1) except OSError as exc: logger.warning("Could not create pidfile %s: %s; continuing without pidfile.", path, exc) def _remove_pidfile(path: str) -> None: try: if os.path.exists(path): os.remove(path) except Exception: pass def load_last_uid(state_file: str) -> Optional[int]: if not os.path.exists(state_file): return None try: with open(state_file, "r") as f: data = json.load(f) return data.get("last_uid") except Exception as e: logger.warning("Failed to read state file: %s", e) return None def save_last_uid(state_file: str, uid: int) -> None: state_dir = os.path.dirname(state_file) if state_dir: os.makedirs(state_dir, exist_ok=True) tmp = f"{state_file}.tmp" with open(tmp, "w") as f: json.dump({"last_uid": uid}, f) os.replace(tmp, state_file) def connect_imap(config: Config) -> imaplib.IMAP4: # Proton Bridge usually uses STARTTLS on 1143. If you configured SSL-only, switch to IMAP4_SSL. logger.info("Connecting to IMAP %s:%s ...", config.imap_host, config.imap_port) try: imap = imaplib.IMAP4(config.imap_host, config.imap_port) except (ConnectionRefusedError, OSError) as exc: raise FatalConfigError( f"IMAP connect failed to {config.imap_host}:{config.imap_port}: {exc}" ) from exc # Upgrade to TLS (verification can be disabled for local Proton Bridge). ssl_context = ssl.create_default_context() if not config.imap_tls_verify: ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE imap.starttls(ssl_context=ssl_context) logger.info("Logging in to IMAP ...") try: imap.login(config.imap_user, config.imap_password) except imaplib.IMAP4.error as exc: raise FatalConfigError(f"IMAP login failed: {exc}") from exc logger.debug("Listing available IMAP folders...") typ, folders = imap.list() available_folders = set() if typ != "OK" or folders is None: raise FatalConfigError(f"Failed to list IMAP folders: {typ}") for folder in folders: decoded = folder.decode("utf-8", errors="ignore") available_folders.add(decoded.split(' "/" ')[-1].strip('"')) logger.debug(" %s", decoded) if config.imap_folder not in available_folders: raise FatalConfigError(f"IMAP folder {config.imap_folder!r} not found; aborting.") logger.info("Selecting folder %r ...", config.imap_folder) typ, _data = imap.select(config.imap_folder) if typ != "OK": raise FatalConfigError(f"IMAP select {config.imap_folder} failed: {typ}") return imap def initialize_state_if_needed(imap: imaplib.IMAP4, state_file: str) -> None: """ On first run (no state file), set last_uid to the current max UID, so we don't reprocess the entire mailbox. """ if load_last_uid(state_file) is not None: return logger.info("No state file found; initializing from current mailbox state...") typ, data = imap.uid("search", None, "ALL") if typ != "OK": logger.warning("UID SEARCH ALL failed; starting from scratch (no last_uid).") save_last_uid(state_file, 0) return uids = parse_uid_list(data) if not uids: logger.info("Mailbox is empty; starting from UID 0.") save_last_uid(state_file, 0) else: max_uid = max(uids) logger.info("Mailbox has existing messages; starting after UID %s.", max_uid) save_last_uid(state_file, max_uid) def parse_uid_list(data: Iterable[bytes]) -> List[int]: """ IMAP UID SEARCH returns something like [b'1 2 3 4']. Return list of ints. """ if not data or data[0] is None: return [] raw = data[0].decode("ascii", errors="ignore").strip() if not raw: return [] return [int(x) for x in raw.split() if x.isdigit()] def fetch_message(imap: imaplib.IMAP4, uid: int) -> Message: attempt = 0 last_exc: Optional[Exception] = None while attempt < 3: attempt += 1 try: typ, fetch_data = imap.uid("fetch", str(uid), "(RFC822)") if typ != "OK": raise RuntimeError(f"FETCH returned {typ}") # fetch_data is like [(b'UID RFC822 {bytes}', b'...raw message...'), b')'] for part in fetch_data: if isinstance(part, tuple) and len(part) == 2: return message_from_bytes(part[1]) raise RuntimeError(f"Unexpected FETCH response shape: {fetch_data}") except Exception as exc: last_exc = exc logger.warning("FETCH attempt %s for UID %s failed: %s", attempt, uid, exc) time.sleep(min(5, attempt * 2)) raise RuntimeError(f"Failed to fetch UID {uid} after retries: {last_exc}") def extract_recipients(msg: Message) -> List[str]: recipients: List[str] = [] for header in ("To", "Cc", "Bcc"): for _name, addr in getaddresses(msg.get_all(header, [])): if addr: recipients.append(addr) # Prevent leaking Bcc headers downstream. if "Bcc" in msg: del msg["Bcc"] cleaned = [] seen = set() for addr in recipients: if "@" not in addr: continue addr_lower = addr.lower() if addr_lower in seen: continue seen.add(addr_lower) cleaned.append(addr) return cleaned def deliver_to_smtp(msg: Message, config: Config) -> None: """ Inject the raw RFC822 message into SMTP using original headers. """ from_header = msg.get("From", "") _sender_name, sender_addr = parseaddr(from_header) sender_addr = sender_addr or "unknown@localhost" recipients = extract_recipients(msg) if not recipients: logger.warning("No recipients found in message; skipping delivery.") return logger.info("Sending from %s to %s", sender_addr, recipients) if logger.isEnabledFor(logging.DEBUG): try: raw_bytes = msg.as_bytes() logger.debug("--- Begin message to SMTP ---") logger.debug(raw_bytes.decode("utf-8", errors="replace")) logger.debug("--- End message to SMTP ---") except Exception as e: logger.debug("Failed to dump message contents: %s", e) logger.info("Connecting to SMTP server...") attempt = 0 last_exc: Optional[Exception] = None while attempt < 3: attempt += 1 try: with smtplib.SMTP(config.smtp_host, config.smtp_port, timeout=10) as smtp: smtp.sendmail(sender_addr, recipients, msg.as_bytes()) logger.info("Delivered message to SMTP.") return except Exception as exc: last_exc = exc logger.warning("SMTP send attempt %s failed: %s", attempt, exc) time.sleep(min(5, attempt * 2)) raise DeliveryError(f"Failed to deliver message after retries: {last_exc}") def poll_loop(config: Config) -> None: while True: imap = None try: imap = connect_imap(config) initialize_state_if_needed(imap, config.state_file) session_stats = {"processed": 0, "delivered": 0, "failed": 0} while True: cycle_start = time.time() cycle_stats = {"processed": 0, "delivered": 0, "failed": 0} last_uid = load_last_uid(config.state_file) or 0 search_criteria = f"(UID {last_uid + 1}:*)" logger.info("Checking for new messages with %s ...", search_criteria) search_attempt = 0 data = [] while search_attempt < 3: search_attempt += 1 typ, data = imap.uid("search", None, search_criteria) if typ == "OK": break logger.warning("UID SEARCH attempt %s failed: %s", search_attempt, typ) time.sleep(min(5, search_attempt * 2)) if typ != "OK": logger.warning("UID SEARCH failed after retries: %s", typ) break uids = parse_uid_list(data) if not uids: logger.info("No new messages.") else: logger.info("Found %s new message(s): %s", len(uids), uids) for uid in sorted(uids): cycle_stats["processed"] += 1 session_stats["processed"] += 1 logger.info("Processing UID %s ...", uid) try: msg = fetch_message(imap, uid) deliver_to_smtp(msg, config) save_last_uid(config.state_file, uid) logger.info("Finished UID %s.", uid) cycle_stats["delivered"] += 1 session_stats["delivered"] += 1 except DeliveryError as e: cycle_stats["failed"] += 1 session_stats["failed"] += 1 logger.error("Delivery error for UID %s: %s", uid, e) save_last_uid(config.state_file, uid) logger.info("Advanced state past failed UID %s to avoid reprocessing.", uid) except Exception as e: logger.exception("Error processing UID %s: %s", uid, e) cycle_stats["failed"] += 1 session_stats["failed"] += 1 logger.info( "metrics cycle_processed=%s cycle_delivered=%s cycle_failed=%s " "session_processed=%s session_delivered=%s session_failed=%s " "last_uid=%s cycle_duration_ms=%.0f", cycle_stats["processed"], cycle_stats["delivered"], cycle_stats["failed"], session_stats["processed"], session_stats["delivered"], session_stats["failed"], load_last_uid(config.state_file) or 0, (time.time() - cycle_start) * 1000, ) logger.info("Sleeping %s seconds...", config.poll_interval) time.sleep(config.poll_interval) except KeyboardInterrupt: logger.info("Interrupted by user, exiting.") break except (FatalConfigError, ValueError) as e: logger.error("Fatal configuration error: %s", e) sys.exit(1) except Exception as e: logger.exception("Top-level error: %s; reconnecting after delay...", e) time.sleep(10) finally: if imap is not None: try: imap.logout() except Exception: logger.debug("Failed to logout cleanly from IMAP.") if __name__ == "__main__": ensure_single_instance() poll_loop(Config.from_env())