460 lines
16 KiB
Python
Executable file
460 lines
16 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
Poll Proton Bridge via IMAP and inject new messages into a local SMTP server.
|
|
|
|
- Assumes Proton Bridge IMAP is running locally.
|
|
- Assumes an SMTP server is listening on a local port (e.g. 2525).
|
|
"""
|
|
|
|
import imaplib
|
|
import json
|
|
import logging
|
|
import os
|
|
import smtplib
|
|
import ssl
|
|
import sys
|
|
import time
|
|
import atexit
|
|
import errno
|
|
from dataclasses import dataclass
|
|
from email import message_from_bytes
|
|
from email.message import Message
|
|
from email.utils import getaddresses, parseaddr
|
|
from typing import Iterable, List, Optional
|
|
|
|
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
|
|
logging.basicConfig(
|
|
level=LOG_LEVEL,
|
|
format="[email_tunnel] %(asctime)s %(levelname)s %(message)s",
|
|
)
|
|
logger = logging.getLogger("email_tunnel")
|
|
PIDFILE_PATH = os.getenv("PIDFILE", "/tmp/email_tunnel.pid")
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class Config:
|
|
imap_host: str
|
|
imap_port: int
|
|
imap_user: str
|
|
imap_password: str
|
|
imap_folder: str
|
|
imap_tls_verify: bool
|
|
smtp_host: str
|
|
smtp_port: int
|
|
poll_interval: int
|
|
state_file: str
|
|
|
|
@classmethod
|
|
def from_env(cls) -> "Config":
|
|
missing: List[str] = []
|
|
|
|
def require(name: str) -> str:
|
|
value = os.getenv(name)
|
|
if not value:
|
|
missing.append(name)
|
|
return value or ""
|
|
|
|
imap_host = require("IMAP_IP")
|
|
imap_port_raw = require("IMAP_PORT")
|
|
smtp_host = require("SMTP_IP")
|
|
smtp_port_raw = require("SMTP_PORT")
|
|
poll_interval_raw = require("POLL_INTERVAL")
|
|
imap_user = require("IMAP_USER")
|
|
imap_password = require("IMAP_PASSWORD")
|
|
imap_folder = require("IMAP_FOLDER")
|
|
imap_tls_verify_raw = require("IMAP_TLS_VERIFY")
|
|
|
|
if missing:
|
|
raise ValueError(f"Missing required environment variables: {', '.join(missing)}")
|
|
|
|
try:
|
|
imap_port = int(imap_port_raw)
|
|
except ValueError:
|
|
raise ValueError("IMAP_PORT must be an integer")
|
|
|
|
try:
|
|
smtp_port = int(smtp_port_raw)
|
|
except ValueError:
|
|
raise ValueError("SMTP_PORT must be an integer")
|
|
|
|
try:
|
|
poll_interval = int(poll_interval_raw)
|
|
except ValueError:
|
|
raise ValueError("POLL_INTERVAL must be an integer")
|
|
|
|
if len(imap_host) == 0:
|
|
raise ValueError("IMAP_IP must be a valid string")
|
|
if imap_port <= 0:
|
|
raise ValueError("IMAP_PORT must be a positive integer")
|
|
if smtp_port <= 0:
|
|
raise ValueError("SMTP_PORT must be a positive integer")
|
|
if poll_interval <= 0:
|
|
raise ValueError("POLL_INTERVAL must be a positive integer")
|
|
if len(imap_user) == 0:
|
|
raise ValueError("IMAP_USER must be a valid string")
|
|
if len(imap_password) == 0:
|
|
raise ValueError("IMAP_PASSWORD must be a valid string")
|
|
if len(imap_folder) == 0:
|
|
raise ValueError("IMAP_FOLDER must be a valid string")
|
|
|
|
return cls(
|
|
imap_host=imap_host,
|
|
imap_port=imap_port,
|
|
imap_user=imap_user,
|
|
imap_password=imap_password,
|
|
imap_folder=imap_folder,
|
|
imap_tls_verify=imap_tls_verify_raw not in ("0", "false", "False"),
|
|
smtp_host=smtp_host,
|
|
smtp_port=smtp_port,
|
|
poll_interval=poll_interval,
|
|
state_file=os.getenv("STATE_FILE", "./state.json"),
|
|
)
|
|
|
|
|
|
class FatalConfigError(RuntimeError):
|
|
"""Raised for configuration problems that require immediate exit."""
|
|
|
|
|
|
class DeliveryError(RuntimeError):
|
|
"""Raised when a message cannot be delivered after retries."""
|
|
|
|
|
|
def ensure_single_instance() -> None:
|
|
"""
|
|
Enforce single instance via pidfile.
|
|
"""
|
|
pid = os.getpid()
|
|
_write_pidfile(PIDFILE_PATH, pid)
|
|
|
|
|
|
def _pid_running(check_pid: int) -> bool:
|
|
if check_pid <= 0:
|
|
return False
|
|
try:
|
|
os.kill(check_pid, 0)
|
|
except OSError as exc:
|
|
if exc.errno in (errno.ESRCH, errno.EINVAL):
|
|
return False
|
|
return True
|
|
else:
|
|
return True
|
|
|
|
|
|
def _write_pidfile(path: str, pid: int) -> None:
|
|
try:
|
|
fd = os.open(path, os.O_CREAT | os.O_EXCL | os.O_WRONLY)
|
|
with os.fdopen(fd, "w") as f:
|
|
f.write(str(pid))
|
|
atexit.register(lambda: _remove_pidfile(path))
|
|
logger.info("Created pidfile at %s", path)
|
|
except FileExistsError:
|
|
try:
|
|
with open(path, "r") as f:
|
|
existing_pid = int(f.read().strip() or 0)
|
|
except Exception:
|
|
existing_pid = 0
|
|
|
|
if existing_pid > 0 and _pid_running(existing_pid):
|
|
logger.error(
|
|
"Pidfile %s exists and process %s is running; exiting.", path, existing_pid
|
|
)
|
|
sys.exit(1)
|
|
else:
|
|
try:
|
|
os.remove(path)
|
|
logger.warning("Stale pidfile %s found; replacing.", path)
|
|
_write_pidfile(path, pid)
|
|
return
|
|
except Exception as exc:
|
|
logger.error("Failed to replace stale pidfile %s: %s", path, exc)
|
|
sys.exit(1)
|
|
except OSError as exc:
|
|
logger.warning("Could not create pidfile %s: %s; continuing without pidfile.", path, exc)
|
|
|
|
|
|
def _remove_pidfile(path: str) -> None:
|
|
try:
|
|
if os.path.exists(path):
|
|
os.remove(path)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def load_last_uid(state_file: str) -> Optional[int]:
|
|
if not os.path.exists(state_file):
|
|
return None
|
|
try:
|
|
with open(state_file, "r") as f:
|
|
data = json.load(f)
|
|
return data.get("last_uid")
|
|
except Exception as e:
|
|
logger.warning("Failed to read state file: %s", e)
|
|
return None
|
|
|
|
|
|
def save_last_uid(state_file: str, uid: int) -> None:
|
|
state_dir = os.path.dirname(state_file)
|
|
if state_dir:
|
|
os.makedirs(state_dir, exist_ok=True)
|
|
|
|
tmp = f"{state_file}.tmp"
|
|
with open(tmp, "w") as f:
|
|
json.dump({"last_uid": uid}, f)
|
|
os.replace(tmp, state_file)
|
|
|
|
|
|
def connect_imap(config: Config) -> imaplib.IMAP4:
|
|
# Proton Bridge usually uses STARTTLS on 1143. If you configured SSL-only, switch to IMAP4_SSL.
|
|
logger.info("Connecting to IMAP %s:%s ...", config.imap_host, config.imap_port)
|
|
try:
|
|
imap = imaplib.IMAP4(config.imap_host, config.imap_port)
|
|
except (ConnectionRefusedError, OSError) as exc:
|
|
raise FatalConfigError(
|
|
f"IMAP connect failed to {config.imap_host}:{config.imap_port}: {exc}"
|
|
) from exc
|
|
|
|
# Upgrade to TLS (verification can be disabled for local Proton Bridge).
|
|
ssl_context = ssl.create_default_context()
|
|
if not config.imap_tls_verify:
|
|
ssl_context.check_hostname = False
|
|
ssl_context.verify_mode = ssl.CERT_NONE
|
|
imap.starttls(ssl_context=ssl_context)
|
|
|
|
logger.info("Logging in to IMAP ...")
|
|
try:
|
|
imap.login(config.imap_user, config.imap_password)
|
|
except imaplib.IMAP4.error as exc:
|
|
raise FatalConfigError(f"IMAP login failed: {exc}") from exc
|
|
|
|
logger.debug("Listing available IMAP folders...")
|
|
typ, folders = imap.list()
|
|
available_folders = set()
|
|
if typ != "OK" or folders is None:
|
|
raise FatalConfigError(f"Failed to list IMAP folders: {typ}")
|
|
|
|
for folder in folders:
|
|
decoded = folder.decode("utf-8", errors="ignore")
|
|
available_folders.add(decoded.split(' "/" ')[-1].strip('"'))
|
|
logger.debug(" %s", decoded)
|
|
if config.imap_folder not in available_folders:
|
|
raise FatalConfigError(f"IMAP folder {config.imap_folder!r} not found; aborting.")
|
|
|
|
logger.info("Selecting folder %r ...", config.imap_folder)
|
|
typ, _data = imap.select(config.imap_folder)
|
|
if typ != "OK":
|
|
raise FatalConfigError(f"IMAP select {config.imap_folder} failed: {typ}")
|
|
return imap
|
|
|
|
|
|
def initialize_state_if_needed(imap: imaplib.IMAP4, state_file: str) -> None:
|
|
"""
|
|
On first run (no state file), set last_uid to the current max UID,
|
|
so we don't reprocess the entire mailbox.
|
|
"""
|
|
if load_last_uid(state_file) is not None:
|
|
return
|
|
|
|
logger.info("No state file found; initializing from current mailbox state...")
|
|
typ, data = imap.uid("search", None, "ALL")
|
|
if typ != "OK":
|
|
logger.warning("UID SEARCH ALL failed; starting from scratch (no last_uid).")
|
|
save_last_uid(state_file, 0)
|
|
return
|
|
|
|
uids = parse_uid_list(data)
|
|
if not uids:
|
|
logger.info("Mailbox is empty; starting from UID 0.")
|
|
save_last_uid(state_file, 0)
|
|
else:
|
|
max_uid = max(uids)
|
|
logger.info("Mailbox has existing messages; starting after UID %s.", max_uid)
|
|
save_last_uid(state_file, max_uid)
|
|
|
|
|
|
def parse_uid_list(data: Iterable[bytes]) -> List[int]:
|
|
"""
|
|
IMAP UID SEARCH returns something like [b'1 2 3 4'].
|
|
Return list of ints.
|
|
"""
|
|
if not data or data[0] is None:
|
|
return []
|
|
raw = data[0].decode("ascii", errors="ignore").strip()
|
|
if not raw:
|
|
return []
|
|
return [int(x) for x in raw.split() if x.isdigit()]
|
|
|
|
|
|
def fetch_message(imap: imaplib.IMAP4, uid: int) -> Message:
|
|
attempt = 0
|
|
last_exc: Optional[Exception] = None
|
|
while attempt < 3:
|
|
attempt += 1
|
|
try:
|
|
typ, fetch_data = imap.uid("fetch", str(uid), "(RFC822)")
|
|
if typ != "OK":
|
|
raise RuntimeError(f"FETCH returned {typ}")
|
|
|
|
# fetch_data is like [(b'UID RFC822 {bytes}', b'...raw message...'), b')']
|
|
for part in fetch_data:
|
|
if isinstance(part, tuple) and len(part) == 2:
|
|
return message_from_bytes(part[1])
|
|
raise RuntimeError(f"Unexpected FETCH response shape: {fetch_data}")
|
|
except Exception as exc:
|
|
last_exc = exc
|
|
logger.warning("FETCH attempt %s for UID %s failed: %s", attempt, uid, exc)
|
|
time.sleep(min(5, attempt * 2))
|
|
raise RuntimeError(f"Failed to fetch UID {uid} after retries: {last_exc}")
|
|
|
|
|
|
def extract_recipients(msg: Message) -> List[str]:
|
|
recipients: List[str] = []
|
|
for header in ("To", "Cc", "Bcc"):
|
|
for _name, addr in getaddresses(msg.get_all(header, [])):
|
|
if addr:
|
|
recipients.append(addr)
|
|
# Prevent leaking Bcc headers downstream.
|
|
if "Bcc" in msg:
|
|
del msg["Bcc"]
|
|
cleaned = []
|
|
seen = set()
|
|
for addr in recipients:
|
|
if "@" not in addr:
|
|
continue
|
|
addr_lower = addr.lower()
|
|
if addr_lower in seen:
|
|
continue
|
|
seen.add(addr_lower)
|
|
cleaned.append(addr)
|
|
return cleaned
|
|
|
|
|
|
def deliver_to_smtp(msg: Message, config: Config) -> None:
|
|
"""
|
|
Inject the raw RFC822 message into SMTP using original headers.
|
|
"""
|
|
from_header = msg.get("From", "")
|
|
_sender_name, sender_addr = parseaddr(from_header)
|
|
sender_addr = sender_addr or "unknown@localhost"
|
|
|
|
recipients = extract_recipients(msg)
|
|
if not recipients:
|
|
logger.warning("No recipients found in message; skipping delivery.")
|
|
return
|
|
logger.info("Sending from %s to %s", sender_addr, recipients)
|
|
|
|
if logger.isEnabledFor(logging.DEBUG):
|
|
try:
|
|
raw_bytes = msg.as_bytes()
|
|
logger.debug("--- Begin message to SMTP ---")
|
|
logger.debug(raw_bytes.decode("utf-8", errors="replace"))
|
|
logger.debug("--- End message to SMTP ---")
|
|
except Exception as e:
|
|
logger.debug("Failed to dump message contents: %s", e)
|
|
|
|
logger.info("Connecting to SMTP server...")
|
|
attempt = 0
|
|
last_exc: Optional[Exception] = None
|
|
while attempt < 3:
|
|
attempt += 1
|
|
try:
|
|
with smtplib.SMTP(config.smtp_host, config.smtp_port, timeout=10) as smtp:
|
|
smtp.sendmail(sender_addr, recipients, msg.as_bytes())
|
|
logger.info("Delivered message to SMTP.")
|
|
return
|
|
except Exception as exc:
|
|
last_exc = exc
|
|
logger.warning("SMTP send attempt %s failed: %s", attempt, exc)
|
|
time.sleep(min(5, attempt * 2))
|
|
raise DeliveryError(f"Failed to deliver message after retries: {last_exc}")
|
|
|
|
|
|
def poll_loop(config: Config) -> None:
|
|
while True:
|
|
imap = None
|
|
try:
|
|
imap = connect_imap(config)
|
|
initialize_state_if_needed(imap, config.state_file)
|
|
session_stats = {"processed": 0, "delivered": 0, "failed": 0}
|
|
while True:
|
|
cycle_start = time.time()
|
|
cycle_stats = {"processed": 0, "delivered": 0, "failed": 0}
|
|
last_uid = load_last_uid(config.state_file) or 0
|
|
search_criteria = f"(UID {last_uid + 1}:*)"
|
|
logger.info("Checking for new messages with %s ...", search_criteria)
|
|
|
|
search_attempt = 0
|
|
data = []
|
|
while search_attempt < 3:
|
|
search_attempt += 1
|
|
typ, data = imap.uid("search", None, search_criteria)
|
|
if typ == "OK":
|
|
break
|
|
logger.warning("UID SEARCH attempt %s failed: %s", search_attempt, typ)
|
|
time.sleep(min(5, search_attempt * 2))
|
|
if typ != "OK":
|
|
logger.warning("UID SEARCH failed after retries: %s", typ)
|
|
break
|
|
|
|
uids = parse_uid_list(data)
|
|
if not uids:
|
|
logger.info("No new messages.")
|
|
else:
|
|
logger.info("Found %s new message(s): %s", len(uids), uids)
|
|
for uid in sorted(uids):
|
|
cycle_stats["processed"] += 1
|
|
session_stats["processed"] += 1
|
|
logger.info("Processing UID %s ...", uid)
|
|
try:
|
|
msg = fetch_message(imap, uid)
|
|
deliver_to_smtp(msg, config)
|
|
save_last_uid(config.state_file, uid)
|
|
logger.info("Finished UID %s.", uid)
|
|
cycle_stats["delivered"] += 1
|
|
session_stats["delivered"] += 1
|
|
except DeliveryError as e:
|
|
cycle_stats["failed"] += 1
|
|
session_stats["failed"] += 1
|
|
logger.error("Delivery error for UID %s: %s", uid, e)
|
|
save_last_uid(config.state_file, uid)
|
|
logger.info("Advanced state past failed UID %s to avoid reprocessing.", uid)
|
|
except Exception as e:
|
|
logger.exception("Error processing UID %s: %s", uid, e)
|
|
cycle_stats["failed"] += 1
|
|
session_stats["failed"] += 1
|
|
|
|
logger.info(
|
|
"metrics cycle_processed=%s cycle_delivered=%s cycle_failed=%s "
|
|
"session_processed=%s session_delivered=%s session_failed=%s "
|
|
"last_uid=%s cycle_duration_ms=%.0f",
|
|
cycle_stats["processed"],
|
|
cycle_stats["delivered"],
|
|
cycle_stats["failed"],
|
|
session_stats["processed"],
|
|
session_stats["delivered"],
|
|
session_stats["failed"],
|
|
load_last_uid(config.state_file) or 0,
|
|
(time.time() - cycle_start) * 1000,
|
|
)
|
|
logger.info("Sleeping %s seconds...", config.poll_interval)
|
|
time.sleep(config.poll_interval)
|
|
|
|
except KeyboardInterrupt:
|
|
logger.info("Interrupted by user, exiting.")
|
|
break
|
|
except (FatalConfigError, ValueError) as e:
|
|
logger.error("Fatal configuration error: %s", e)
|
|
sys.exit(1)
|
|
except Exception as e:
|
|
logger.exception("Top-level error: %s; reconnecting after delay...", e)
|
|
time.sleep(10)
|
|
finally:
|
|
if imap is not None:
|
|
try:
|
|
imap.logout()
|
|
except Exception:
|
|
logger.debug("Failed to logout cleanly from IMAP.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
ensure_single_instance()
|
|
poll_loop(Config.from_env())
|