local_email_bridge/email_tunnel.py

461 lines
16 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""
Poll Proton Bridge via IMAP and inject new messages into a local SMTP server.
- Assumes Proton Bridge IMAP is running locally.
- Assumes an SMTP server is listening on a local port (e.g. 2525).
"""
import imaplib
import json
import logging
import os
import smtplib
import ssl
import sys
import time
import atexit
import errno
from dataclasses import dataclass
from email import message_from_bytes
from email.message import Message
from email.utils import getaddresses, parseaddr
from typing import Iterable, List, Optional
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
logging.basicConfig(
level=LOG_LEVEL,
format="[email_tunnel] %(asctime)s %(levelname)s %(message)s",
)
logger = logging.getLogger("email_tunnel")
PIDFILE_PATH = os.getenv("PIDFILE", "/tmp/email_tunnel.pid")
@dataclass(frozen=True)
class Config:
imap_host: str
imap_port: int
imap_user: str
imap_password: str
imap_folder: str
imap_tls_verify: bool
smtp_host: str
smtp_port: int
poll_interval: int
state_file: str
@classmethod
def from_env(cls) -> "Config":
missing: List[str] = []
def require(name: str) -> str:
value = os.getenv(name)
if not value:
missing.append(name)
return value or ""
imap_host = require("IMAP_IP")
imap_port_raw = require("IMAP_PORT")
smtp_host = require("SMTP_IP")
smtp_port_raw = require("SMTP_PORT")
poll_interval_raw = require("POLL_INTERVAL")
imap_user = require("IMAP_USER")
imap_password = require("IMAP_PASSWORD")
imap_folder = require("IMAP_FOLDER")
imap_tls_verify_raw = require("IMAP_TLS_VERIFY")
if missing:
raise ValueError(f"Missing required environment variables: {', '.join(missing)}")
try:
imap_port = int(imap_port_raw)
except ValueError:
raise ValueError("IMAP_PORT must be an integer")
try:
smtp_port = int(smtp_port_raw)
except ValueError:
raise ValueError("SMTP_PORT must be an integer")
try:
poll_interval = int(poll_interval_raw)
except ValueError:
raise ValueError("POLL_INTERVAL must be an integer")
if len(imap_host) == 0:
raise ValueError("IMAP_IP must be a valid string")
if imap_port <= 0:
raise ValueError("IMAP_PORT must be a positive integer")
if smtp_port <= 0:
raise ValueError("SMTP_PORT must be a positive integer")
if poll_interval <= 0:
raise ValueError("POLL_INTERVAL must be a positive integer")
if len(imap_user) == 0:
raise ValueError("IMAP_USER must be a valid string")
if len(imap_password) == 0:
raise ValueError("IMAP_PASSWORD must be a valid string")
if len(imap_folder) == 0:
raise ValueError("IMAP_FOLDER must be a valid string")
return cls(
imap_host=imap_host,
imap_port=imap_port,
imap_user=imap_user,
imap_password=imap_password,
imap_folder=imap_folder,
imap_tls_verify=imap_tls_verify_raw not in ("0", "false", "False"),
smtp_host=smtp_host,
smtp_port=smtp_port,
poll_interval=poll_interval,
state_file=os.getenv("STATE_FILE", "./state.json"),
)
class FatalConfigError(RuntimeError):
"""Raised for configuration problems that require immediate exit."""
class DeliveryError(RuntimeError):
"""Raised when a message cannot be delivered after retries."""
def ensure_single_instance() -> None:
"""
Enforce single instance via pidfile.
"""
pid = os.getpid()
_write_pidfile(PIDFILE_PATH, pid)
def _pid_running(check_pid: int) -> bool:
if check_pid <= 0:
return False
try:
os.kill(check_pid, 0)
except OSError as exc:
if exc.errno in (errno.ESRCH, errno.EINVAL):
return False
return True
else:
return True
def _write_pidfile(path: str, pid: int) -> None:
try:
fd = os.open(path, os.O_CREAT | os.O_EXCL | os.O_WRONLY)
with os.fdopen(fd, "w") as f:
f.write(str(pid))
atexit.register(lambda: _remove_pidfile(path))
logger.info("Created pidfile at %s", path)
except FileExistsError:
try:
with open(path, "r") as f:
existing_pid = int(f.read().strip() or 0)
except Exception:
existing_pid = 0
if existing_pid > 0 and _pid_running(existing_pid):
logger.error(
"Pidfile %s exists and process %s is running; exiting.", path, existing_pid
)
sys.exit(1)
else:
try:
os.remove(path)
logger.warning("Stale pidfile %s found; replacing.", path)
_write_pidfile(path, pid)
return
except Exception as exc:
logger.error("Failed to replace stale pidfile %s: %s", path, exc)
sys.exit(1)
except OSError as exc:
logger.warning("Could not create pidfile %s: %s; continuing without pidfile.", path, exc)
def _remove_pidfile(path: str) -> None:
try:
if os.path.exists(path):
os.remove(path)
except Exception:
pass
def load_last_uid(state_file: str) -> Optional[int]:
if not os.path.exists(state_file):
return None
try:
with open(state_file, "r") as f:
data = json.load(f)
return data.get("last_uid")
except Exception as e:
logger.warning("Failed to read state file: %s", e)
return None
def save_last_uid(state_file: str, uid: int) -> None:
state_dir = os.path.dirname(state_file)
if state_dir:
os.makedirs(state_dir, exist_ok=True)
tmp = f"{state_file}.tmp"
with open(tmp, "w") as f:
json.dump({"last_uid": uid}, f)
os.replace(tmp, state_file)
def connect_imap(config: Config) -> imaplib.IMAP4:
# Proton Bridge usually uses STARTTLS on 1143. If you configured SSL-only, switch to IMAP4_SSL.
logger.info("Connecting to IMAP %s:%s ...", config.imap_host, config.imap_port)
try:
imap = imaplib.IMAP4(config.imap_host, config.imap_port)
except (ConnectionRefusedError, OSError) as exc:
raise FatalConfigError(
f"IMAP connect failed to {config.imap_host}:{config.imap_port}: {exc}"
) from exc
# Upgrade to TLS (verification can be disabled for local Proton Bridge).
ssl_context = ssl.create_default_context()
if not config.imap_tls_verify:
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
imap.starttls(ssl_context=ssl_context)
logger.info("Logging in to IMAP ...")
try:
imap.login(config.imap_user, config.imap_password)
except imaplib.IMAP4.error as exc:
raise FatalConfigError(f"IMAP login failed: {exc}") from exc
logger.debug("Listing available IMAP folders...")
typ, folders = imap.list()
available_folders = set()
if typ != "OK" or folders is None:
raise FatalConfigError(f"Failed to list IMAP folders: {typ}")
for folder in folders:
decoded = folder.decode("utf-8", errors="ignore")
available_folders.add(decoded.split(' "/" ')[-1].strip('"'))
logger.debug(" %s", decoded)
if config.imap_folder not in available_folders:
raise FatalConfigError(f"IMAP folder {config.imap_folder!r} not found; aborting.")
logger.info("Selecting folder %r ...", config.imap_folder)
typ, _data = imap.select(config.imap_folder)
if typ != "OK":
raise FatalConfigError(f"IMAP select {config.imap_folder} failed: {typ}")
return imap
def initialize_state_if_needed(imap: imaplib.IMAP4, state_file: str) -> None:
"""
On first run (no state file), set last_uid to the current max UID,
so we don't reprocess the entire mailbox.
"""
if load_last_uid(state_file) is not None:
return
logger.info("No state file found; initializing from current mailbox state...")
typ, data = imap.uid("search", None, "ALL")
if typ != "OK":
logger.warning("UID SEARCH ALL failed; starting from scratch (no last_uid).")
save_last_uid(state_file, 0)
return
uids = parse_uid_list(data)
if not uids:
logger.info("Mailbox is empty; starting from UID 0.")
save_last_uid(state_file, 0)
else:
max_uid = max(uids)
logger.info("Mailbox has existing messages; starting after UID %s.", max_uid)
save_last_uid(state_file, max_uid)
def parse_uid_list(data: Iterable[bytes]) -> List[int]:
"""
IMAP UID SEARCH returns something like [b'1 2 3 4'].
Return list of ints.
"""
if not data or data[0] is None:
return []
raw = data[0].decode("ascii", errors="ignore").strip()
if not raw:
return []
return [int(x) for x in raw.split() if x.isdigit()]
def fetch_message(imap: imaplib.IMAP4, uid: int) -> Message:
attempt = 0
last_exc: Optional[Exception] = None
while attempt < 3:
attempt += 1
try:
typ, fetch_data = imap.uid("fetch", str(uid), "(RFC822)")
if typ != "OK":
raise RuntimeError(f"FETCH returned {typ}")
# fetch_data is like [(b'UID RFC822 {bytes}', b'...raw message...'), b')']
for part in fetch_data:
if isinstance(part, tuple) and len(part) == 2:
return message_from_bytes(part[1])
raise RuntimeError(f"Unexpected FETCH response shape: {fetch_data}")
except Exception as exc:
last_exc = exc
logger.warning("FETCH attempt %s for UID %s failed: %s", attempt, uid, exc)
time.sleep(min(5, attempt * 2))
raise RuntimeError(f"Failed to fetch UID {uid} after retries: {last_exc}")
def extract_recipients(msg: Message) -> List[str]:
recipients: List[str] = []
for header in ("To", "Cc", "Bcc"):
for _name, addr in getaddresses(msg.get_all(header, [])):
if addr:
recipients.append(addr)
# Prevent leaking Bcc headers downstream.
if "Bcc" in msg:
del msg["Bcc"]
cleaned = []
seen = set()
for addr in recipients:
if "@" not in addr:
continue
addr_lower = addr.lower()
if addr_lower in seen:
continue
seen.add(addr_lower)
cleaned.append(addr)
return cleaned
def deliver_to_smtp(msg: Message, config: Config) -> None:
"""
Inject the raw RFC822 message into SMTP using original headers.
"""
from_header = msg.get("From", "")
_sender_name, sender_addr = parseaddr(from_header)
sender_addr = sender_addr or "unknown@localhost"
recipients = extract_recipients(msg)
if not recipients:
logger.warning("No recipients found in message; skipping delivery.")
return
logger.info("Sending from %s to %s", sender_addr, recipients)
if logger.isEnabledFor(logging.DEBUG):
try:
raw_bytes = msg.as_bytes()
logger.debug("--- Begin message to SMTP ---")
logger.debug(raw_bytes.decode("utf-8", errors="replace"))
logger.debug("--- End message to SMTP ---")
except Exception as e:
logger.debug("Failed to dump message contents: %s", e)
logger.info("Connecting to SMTP server...")
attempt = 0
last_exc: Optional[Exception] = None
while attempt < 3:
attempt += 1
try:
with smtplib.SMTP(config.smtp_host, config.smtp_port, timeout=10) as smtp:
smtp.sendmail(sender_addr, recipients, msg.as_bytes())
logger.info("Delivered message to SMTP.")
return
except Exception as exc:
last_exc = exc
logger.warning("SMTP send attempt %s failed: %s", attempt, exc)
time.sleep(min(5, attempt * 2))
raise DeliveryError(f"Failed to deliver message after retries: {last_exc}")
def poll_loop(config: Config) -> None:
while True:
imap = None
try:
imap = connect_imap(config)
initialize_state_if_needed(imap, config.state_file)
session_stats = {"processed": 0, "delivered": 0, "failed": 0}
while True:
cycle_start = time.time()
cycle_stats = {"processed": 0, "delivered": 0, "failed": 0}
last_uid = load_last_uid(config.state_file) or 0
search_criteria = f"(UID {last_uid + 1}:*)"
logger.info("Checking for new messages with %s ...", search_criteria)
search_attempt = 0
data = []
while search_attempt < 3:
search_attempt += 1
typ, data = imap.uid("search", None, search_criteria)
if typ == "OK":
break
logger.warning("UID SEARCH attempt %s failed: %s", search_attempt, typ)
time.sleep(min(5, search_attempt * 2))
if typ != "OK":
logger.warning("UID SEARCH failed after retries: %s", typ)
break
uids = parse_uid_list(data)
if not uids:
logger.info("No new messages.")
else:
logger.info("Found %s new message(s): %s", len(uids), uids)
for uid in sorted(uids):
cycle_stats["processed"] += 1
session_stats["processed"] += 1
logger.info("Processing UID %s ...", uid)
try:
msg = fetch_message(imap, uid)
deliver_to_smtp(msg, config)
save_last_uid(config.state_file, uid)
logger.info("Finished UID %s.", uid)
cycle_stats["delivered"] += 1
session_stats["delivered"] += 1
except DeliveryError as e:
cycle_stats["failed"] += 1
session_stats["failed"] += 1
logger.error("Delivery error for UID %s: %s", uid, e)
save_last_uid(config.state_file, uid)
logger.info("Advanced state past failed UID %s to avoid reprocessing.", uid)
except Exception as e:
logger.exception("Error processing UID %s: %s", uid, e)
cycle_stats["failed"] += 1
session_stats["failed"] += 1
logger.info(
"metrics cycle_processed=%s cycle_delivered=%s cycle_failed=%s "
"session_processed=%s session_delivered=%s session_failed=%s "
"last_uid=%s cycle_duration_ms=%.0f",
cycle_stats["processed"],
cycle_stats["delivered"],
cycle_stats["failed"],
session_stats["processed"],
session_stats["delivered"],
session_stats["failed"],
load_last_uid(config.state_file) or 0,
(time.time() - cycle_start) * 1000,
)
logger.info("Sleeping %s seconds...", config.poll_interval)
time.sleep(config.poll_interval)
except KeyboardInterrupt:
logger.info("Interrupted by user, exiting.")
break
except (FatalConfigError, ValueError) as e:
logger.error("Fatal configuration error: %s", e)
sys.exit(1)
except Exception as e:
logger.exception("Top-level error: %s; reconnecting after delay...", e)
time.sleep(10)
finally:
if imap is not None:
try:
imap.logout()
except Exception:
logger.debug("Failed to logout cleanly from IMAP.")
if __name__ == "__main__":
ensure_single_instance()
poll_loop(Config.from_env())