From c10b9e634dd4ce647d1a2dcd07f4029a31f82d52 Mon Sep 17 00:00:00 2001 From: KacperLa Date: Wed, 26 Nov 2025 14:48:58 -0500 Subject: [PATCH] Add email_tunnel script and README for Proton Bridge IMAP integration --- README.md | 62 +++++++ email_tunnel.py | 452 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 514 insertions(+) create mode 100644 README.md create mode 100755 email_tunnel.py diff --git a/README.md b/README.md new file mode 100644 index 0000000..2f8766a --- /dev/null +++ b/README.md @@ -0,0 +1,62 @@ +# email_tunnel + +Poll Proton Bridge via IMAP and inject messages into a local SMTP server (e.g., Haraka). The script is designed for headless, continuous operation with basic safety/observability baked in. + +## What it does +- Connects to Proton Bridge IMAP via STARTTLS and selects a required folder. +- Polls for new messages by UID, fetches each, and relays it verbatim to a local SMTP server using the original headers. +- Strips `Bcc`, deduplicates recipients, and skips malformed addresses. +- Tracks progress in a JSON state file so messages are processed once, even across restarts. +- Retries IMAP fetch/search and SMTP delivery with bounded attempts/backoff; advances state past permanent delivery failures to avoid loops. +- Enforces single-instance execution via a pidfile lock. +- Emits scrape-friendly INFO logs each cycle with counts, last UID, and cycle duration. + +## Requirements +- Python 3.x +- Proton Bridge IMAP reachable at the configured host/port +- Local SMTP server reachable (e.g., Haraka bound to a non-25 port) + +## Configuration (environment variables) +- `IMAP_IP` (required): Proton Bridge IMAP host. +- `IMAP_PORT` (required): Proton Bridge IMAP port (typically 1143). +- `IMAP_USER` (required): IMAP username. +- `IMAP_PASSWORD` (required): IMAP password. +- `IMAP_FOLDER` (required): IMAP folder to poll; must exist (no fallback). +- `IMAP_TLS_VERIFY` (default `1`): Set to `0`/`false` to disable cert/hostname checks (dev only). +- `SMTP_IP` (default `127.0.0.1`): SMTP host to deliver to. +- `SMTP_PORT` (default `2525`): SMTP port to deliver to. +- `POLL_INTERVAL` (default `15`): Seconds between mailbox polls. +- `STATE_FILE` (default `./state.json`): Path to persist last processed UID. +- `PIDFILE` (default `/tmp/email_tunnel.pid`): Pidfile for single-instance lock. +- `LOG_LEVEL` (default `INFO`): Standard logging levels (`DEBUG`, `INFO`, etc.). + +## Usage +```bash +export IMAP_IP=127.0.0.1 +export IMAP_PORT=1143 +export IMAP_USER=... +export IMAP_PASSWORD=... +export IMAP_FOLDER=loomio +export SMTP_IP=127.0.0.1 +export SMTP_PORT=2525 +python3 email_tunnel.py +``` + +Run under a process supervisor (systemd, supervisord, etc.) so restarts and log rotation are handled. + +## Operational behavior +- State tracking: `STATE_FILE` records the last successfully handled UID. It is only advanced after successful delivery, or after a logged `DeliveryError` to avoid reprocessing a permanently failed message. +- Retries: IMAP SEARCH/FETCH and SMTP SEND are retried up to 3 times with small backoff. Persistent failures log and move on. +- Single instance: pidfile lock prevents multiple concurrent runs; stale pidfiles are replaced if the recorded PID is not running. +- Metrics logging: every poll cycle emits a single INFO line with `metrics` prefix and key/value pairs (`cycle_processed`, `cycle_delivered`, `cycle_failed`, `session_*`, `last_uid`, `cycle_duration_ms`). +- Missing state: if `STATE_FILE` is absent on startup, the script initializes it to the current max UID (or 0 for an empty mailbox) to avoid reprocessing the entire mailbox. + +## Notes and safety +- Leave `IMAP_TLS_VERIFY` enabled in production; disable only for trusted, local endpoints. +- Ensure the `STATE_FILE` and `PIDFILE` paths are writable by the service user. +- If delivering to a server that rejects certain recipients, the script will log the error, advance the UID, and continue. + +## Troubleshooting +- Increase verbosity with `LOG_LEVEL=DEBUG` to see raw message dumps and IMAP folder listings. +- If the script exits immediately, verify required env vars and that `IMAP_FOLDER` exists on the server. +- For pidfile conflicts, check the process holding the PID or remove a stale pidfile if no process is running.*** diff --git a/email_tunnel.py b/email_tunnel.py new file mode 100755 index 0000000..3e640dd --- /dev/null +++ b/email_tunnel.py @@ -0,0 +1,452 @@ +#!/usr/bin/env python3 +""" +Poll Proton Bridge via IMAP and inject new messages into a local SMTP server. + +- Assumes Proton Bridge IMAP is running locally. +- Assumes an SMTP server is listening on a local port (e.g. 2525). +""" + +import imaplib +import json +import logging +import os +import smtplib +import ssl +import sys +import time +import atexit +import errno +from dataclasses import dataclass +from email import message_from_bytes +from email.message import Message +from email.utils import getaddresses, parseaddr +from typing import Iterable, List, Optional + +LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper() +logging.basicConfig( + level=LOG_LEVEL, + format="[email_tunnel] %(asctime)s %(levelname)s %(message)s", +) +logger = logging.getLogger("email_tunnel") +PIDFILE_PATH = os.getenv("PIDFILE", "/tmp/email_tunnel.pid") + + +@dataclass(frozen=True) +class Config: + imap_host: str + imap_port: int + imap_user: str + imap_password: str + imap_folder: str + imap_tls_verify: bool + smtp_host: str + smtp_port: int + poll_interval: int + state_file: str + + @classmethod + def from_env(cls) -> "Config": + missing: List[str] = [] + + def require(name: str) -> str: + value = os.getenv(name) + if not value: + missing.append(name) + return value or "" + + imap_host = require("IMAP_IP") + imap_port_raw = require("IMAP_PORT") + smtp_host = require("SMTP_IP") + smtp_port_raw = require("SMTP_PORT") + poll_interval_raw = require("POLL_INTERVAL") + imap_user = require("IMAP_USER") + imap_password = require("IMAP_PASSWORD") + imap_folder = require("IMAP_FOLDER") + imap_tls_verify_raw = require("IMAP_TLS_VERIFY") + + if missing: + raise ValueError(f"Missing required environment variables: {', '.join(missing)}") + + try: + imap_port = int(imap_port_raw) + except ValueError: + raise ValueError("IMAP_PORT must be an integer") + + try: + smtp_port = int(smtp_port_raw) + except ValueError: + raise ValueError("SMTP_PORT must be an integer") + + try: + poll_interval = int(poll_interval_raw) + except ValueError: + raise ValueError("POLL_INTERVAL must be an integer") + + if len(imap_host) == 0: + raise ValueError("IMAP_IP must be a valid string") + if imap_port <= 0: + raise ValueError("IMAP_PORT must be a positive integer") + if smtp_port <= 0: + raise ValueError("SMTP_PORT must be a positive integer") + if poll_interval <= 0: + raise ValueError("POLL_INTERVAL must be a positive integer") + if len(imap_user) == 0: + raise ValueError("IMAP_USER must be a valid string") + if len(imap_password) == 0: + raise ValueError("IMAP_PASSWORD must be a valid string") + if len(imap_folder) == 0: + raise ValueError("IMAP_FOLDER must be a valid string") + + return cls( + imap_host=imap_host, + imap_port=imap_port, + imap_user=imap_user, + imap_password=imap_password, + imap_folder=imap_folder, + imap_tls_verify=imap_tls_verify_raw not in ("0", "false", "False"), + smtp_host=smtp_host, + smtp_port=smtp_port, + poll_interval=poll_interval, + state_file=os.getenv("STATE_FILE", "./state.json"), + ) + + +class FatalConfigError(RuntimeError): + """Raised for configuration problems that require immediate exit.""" + + +class DeliveryError(RuntimeError): + """Raised when a message cannot be delivered after retries.""" + + +def ensure_single_instance() -> None: + """ + Enforce single instance via pidfile. + """ + pid = os.getpid() + _write_pidfile(PIDFILE_PATH, pid) + + +def _pid_running(check_pid: int) -> bool: + if check_pid <= 0: + return False + try: + os.kill(check_pid, 0) + except OSError as exc: + if exc.errno in (errno.ESRCH, errno.EINVAL): + return False + return True + else: + return True + + +def _write_pidfile(path: str, pid: int) -> None: + try: + fd = os.open(path, os.O_CREAT | os.O_EXCL | os.O_WRONLY) + with os.fdopen(fd, "w") as f: + f.write(str(pid)) + atexit.register(lambda: _remove_pidfile(path)) + logger.info("Created pidfile at %s", path) + except FileExistsError: + try: + with open(path, "r") as f: + existing_pid = int(f.read().strip() or 0) + except Exception: + existing_pid = 0 + + if existing_pid > 0 and _pid_running(existing_pid): + logger.error( + "Pidfile %s exists and process %s is running; exiting.", path, existing_pid + ) + sys.exit(1) + else: + try: + os.remove(path) + logger.warning("Stale pidfile %s found; replacing.", path) + _write_pidfile(path, pid) + return + except Exception as exc: + logger.error("Failed to replace stale pidfile %s: %s", path, exc) + sys.exit(1) + except OSError as exc: + logger.warning("Could not create pidfile %s: %s; continuing without pidfile.", path, exc) + + +def _remove_pidfile(path: str) -> None: + try: + if os.path.exists(path): + os.remove(path) + except Exception: + pass + + +def load_last_uid(state_file: str) -> Optional[int]: + if not os.path.exists(state_file): + return None + try: + with open(state_file, "r") as f: + data = json.load(f) + return data.get("last_uid") + except Exception as e: + logger.warning("Failed to read state file: %s", e) + return None + + +def save_last_uid(state_file: str, uid: int) -> None: + state_dir = os.path.dirname(state_file) + if state_dir: + os.makedirs(state_dir, exist_ok=True) + + tmp = f"{state_file}.tmp" + with open(tmp, "w") as f: + json.dump({"last_uid": uid}, f) + os.replace(tmp, state_file) + + +def connect_imap(config: Config) -> imaplib.IMAP4: + # Proton Bridge usually uses STARTTLS on 1143. If you configured SSL-only, switch to IMAP4_SSL. + logger.info("Connecting to IMAP %s:%s ...", config.imap_host, config.imap_port) + imap = imaplib.IMAP4(config.imap_host, config.imap_port) + + # Upgrade to TLS (verification can be disabled for local Proton Bridge). + ssl_context = ssl.create_default_context() + if not config.imap_tls_verify: + ssl_context.check_hostname = False + ssl_context.verify_mode = ssl.CERT_NONE + imap.starttls(ssl_context=ssl_context) + + logger.info("Logging in to IMAP ...") + imap.login(config.imap_user, config.imap_password) + + logger.debug("Listing available IMAP folders...") + typ, folders = imap.list() + available_folders = set() + if typ != "OK" or folders is None: + raise FatalConfigError(f"Failed to list IMAP folders: {typ}") + + for folder in folders: + decoded = folder.decode("utf-8", errors="ignore") + available_folders.add(decoded.split(' "/" ')[-1].strip('"')) + logger.debug(" %s", decoded) + if config.imap_folder not in available_folders: + raise FatalConfigError(f"IMAP folder {config.imap_folder!r} not found; aborting.") + + logger.info("Selecting folder %r ...", config.imap_folder) + typ, _data = imap.select(config.imap_folder) + if typ != "OK": + raise FatalConfigError(f"IMAP select {config.imap_folder} failed: {typ}") + return imap + + +def initialize_state_if_needed(imap: imaplib.IMAP4, state_file: str) -> None: + """ + On first run (no state file), set last_uid to the current max UID, + so we don't reprocess the entire mailbox. + """ + if load_last_uid(state_file) is not None: + return + + logger.info("No state file found; initializing from current mailbox state...") + typ, data = imap.uid("search", None, "ALL") + if typ != "OK": + logger.warning("UID SEARCH ALL failed; starting from scratch (no last_uid).") + save_last_uid(state_file, 0) + return + + uids = parse_uid_list(data) + if not uids: + logger.info("Mailbox is empty; starting from UID 0.") + save_last_uid(state_file, 0) + else: + max_uid = max(uids) + logger.info("Mailbox has existing messages; starting after UID %s.", max_uid) + save_last_uid(state_file, max_uid) + + +def parse_uid_list(data: Iterable[bytes]) -> List[int]: + """ + IMAP UID SEARCH returns something like [b'1 2 3 4']. + Return list of ints. + """ + if not data or data[0] is None: + return [] + raw = data[0].decode("ascii", errors="ignore").strip() + if not raw: + return [] + return [int(x) for x in raw.split() if x.isdigit()] + + +def fetch_message(imap: imaplib.IMAP4, uid: int) -> Message: + attempt = 0 + last_exc: Optional[Exception] = None + while attempt < 3: + attempt += 1 + try: + typ, fetch_data = imap.uid("fetch", str(uid), "(RFC822)") + if typ != "OK": + raise RuntimeError(f"FETCH returned {typ}") + + # fetch_data is like [(b'UID RFC822 {bytes}', b'...raw message...'), b')'] + for part in fetch_data: + if isinstance(part, tuple) and len(part) == 2: + return message_from_bytes(part[1]) + raise RuntimeError(f"Unexpected FETCH response shape: {fetch_data}") + except Exception as exc: + last_exc = exc + logger.warning("FETCH attempt %s for UID %s failed: %s", attempt, uid, exc) + time.sleep(min(5, attempt * 2)) + raise RuntimeError(f"Failed to fetch UID {uid} after retries: {last_exc}") + + +def extract_recipients(msg: Message) -> List[str]: + recipients: List[str] = [] + for header in ("To", "Cc", "Bcc"): + for _name, addr in getaddresses(msg.get_all(header, [])): + if addr: + recipients.append(addr) + # Prevent leaking Bcc headers downstream. + if "Bcc" in msg: + del msg["Bcc"] + cleaned = [] + seen = set() + for addr in recipients: + if "@" not in addr: + continue + addr_lower = addr.lower() + if addr_lower in seen: + continue + seen.add(addr_lower) + cleaned.append(addr) + return cleaned + + +def deliver_to_smtp(msg: Message, config: Config) -> None: + """ + Inject the raw RFC822 message into SMTP using original headers. + """ + from_header = msg.get("From", "") + _sender_name, sender_addr = parseaddr(from_header) + sender_addr = sender_addr or "unknown@localhost" + + recipients = extract_recipients(msg) + if not recipients: + logger.warning("No recipients found in message; skipping delivery.") + return + logger.info("Sending from %s to %s", sender_addr, recipients) + + if logger.isEnabledFor(logging.DEBUG): + try: + raw_bytes = msg.as_bytes() + logger.debug("--- Begin message to SMTP ---") + logger.debug(raw_bytes.decode("utf-8", errors="replace")) + logger.debug("--- End message to SMTP ---") + except Exception as e: + logger.debug("Failed to dump message contents: %s", e) + + logger.info("Connecting to SMTP server...") + attempt = 0 + last_exc: Optional[Exception] = None + while attempt < 3: + attempt += 1 + try: + with smtplib.SMTP(config.smtp_host, config.smtp_port, timeout=10) as smtp: + smtp.sendmail(sender_addr, recipients, msg.as_bytes()) + logger.info("Delivered message to SMTP.") + return + except Exception as exc: + last_exc = exc + logger.warning("SMTP send attempt %s failed: %s", attempt, exc) + time.sleep(min(5, attempt * 2)) + raise DeliveryError(f"Failed to deliver message after retries: {last_exc}") + + +def poll_loop(config: Config) -> None: + while True: + imap = None + try: + imap = connect_imap(config) + initialize_state_if_needed(imap, config.state_file) + session_stats = {"processed": 0, "delivered": 0, "failed": 0} + while True: + cycle_start = time.time() + cycle_stats = {"processed": 0, "delivered": 0, "failed": 0} + last_uid = load_last_uid(config.state_file) or 0 + search_criteria = f"(UID {last_uid + 1}:*)" + logger.info("Checking for new messages with %s ...", search_criteria) + + search_attempt = 0 + data = [] + while search_attempt < 3: + search_attempt += 1 + typ, data = imap.uid("search", None, search_criteria) + if typ == "OK": + break + logger.warning("UID SEARCH attempt %s failed: %s", search_attempt, typ) + time.sleep(min(5, search_attempt * 2)) + if typ != "OK": + logger.warning("UID SEARCH failed after retries: %s", typ) + break + + uids = parse_uid_list(data) + if not uids: + logger.info("No new messages.") + else: + logger.info("Found %s new message(s): %s", len(uids), uids) + for uid in sorted(uids): + cycle_stats["processed"] += 1 + session_stats["processed"] += 1 + logger.info("Processing UID %s ...", uid) + try: + msg = fetch_message(imap, uid) + deliver_to_smtp(msg, config) + save_last_uid(config.state_file, uid) + logger.info("Finished UID %s.", uid) + cycle_stats["delivered"] += 1 + session_stats["delivered"] += 1 + except DeliveryError as e: + cycle_stats["failed"] += 1 + session_stats["failed"] += 1 + logger.error("Delivery error for UID %s: %s", uid, e) + save_last_uid(config.state_file, uid) + logger.info("Advanced state past failed UID %s to avoid reprocessing.", uid) + except Exception as e: + logger.exception("Error processing UID %s: %s", uid, e) + cycle_stats["failed"] += 1 + session_stats["failed"] += 1 + + logger.info( + "metrics cycle_processed=%s cycle_delivered=%s cycle_failed=%s " + "session_processed=%s session_delivered=%s session_failed=%s " + "last_uid=%s cycle_duration_ms=%.0f", + cycle_stats["processed"], + cycle_stats["delivered"], + cycle_stats["failed"], + session_stats["processed"], + session_stats["delivered"], + session_stats["failed"], + load_last_uid(config.state_file) or 0, + (time.time() - cycle_start) * 1000, + ) + logger.info("Sleeping %s seconds...", config.poll_interval) + time.sleep(config.poll_interval) + + except KeyboardInterrupt: + logger.info("Interrupted by user, exiting.") + break + except (FatalConfigError, ValueError) as e: + logger.error("Fatal configuration error: %s", e) + sys.exit(1) + except Exception as e: + logger.exception("Top-level error: %s; reconnecting after delay...", e) + time.sleep(10) + finally: + if imap is not None: + try: + imap.logout() + except Exception: + logger.debug("Failed to logout cleanly from IMAP.") + + +if __name__ == "__main__": + ensure_single_instance() + poll_loop(Config.from_env())