Add email_tunnel script and README for Proton Bridge IMAP integration
This commit is contained in:
commit
c10b9e634d
2 changed files with 514 additions and 0 deletions
62
README.md
Normal file
62
README.md
Normal file
|
|
@ -0,0 +1,62 @@
|
|||
# email_tunnel
|
||||
|
||||
Poll Proton Bridge via IMAP and inject messages into a local SMTP server (e.g., Haraka). The script is designed for headless, continuous operation with basic safety/observability baked in.
|
||||
|
||||
## What it does
|
||||
- Connects to Proton Bridge IMAP via STARTTLS and selects a required folder.
|
||||
- Polls for new messages by UID, fetches each, and relays it verbatim to a local SMTP server using the original headers.
|
||||
- Strips `Bcc`, deduplicates recipients, and skips malformed addresses.
|
||||
- Tracks progress in a JSON state file so messages are processed once, even across restarts.
|
||||
- Retries IMAP fetch/search and SMTP delivery with bounded attempts/backoff; advances state past permanent delivery failures to avoid loops.
|
||||
- Enforces single-instance execution via a pidfile lock.
|
||||
- Emits scrape-friendly INFO logs each cycle with counts, last UID, and cycle duration.
|
||||
|
||||
## Requirements
|
||||
- Python 3.x
|
||||
- Proton Bridge IMAP reachable at the configured host/port
|
||||
- Local SMTP server reachable (e.g., Haraka bound to a non-25 port)
|
||||
|
||||
## Configuration (environment variables)
|
||||
- `IMAP_IP` (required): Proton Bridge IMAP host.
|
||||
- `IMAP_PORT` (required): Proton Bridge IMAP port (typically 1143).
|
||||
- `IMAP_USER` (required): IMAP username.
|
||||
- `IMAP_PASSWORD` (required): IMAP password.
|
||||
- `IMAP_FOLDER` (required): IMAP folder to poll; must exist (no fallback).
|
||||
- `IMAP_TLS_VERIFY` (default `1`): Set to `0`/`false` to disable cert/hostname checks (dev only).
|
||||
- `SMTP_IP` (default `127.0.0.1`): SMTP host to deliver to.
|
||||
- `SMTP_PORT` (default `2525`): SMTP port to deliver to.
|
||||
- `POLL_INTERVAL` (default `15`): Seconds between mailbox polls.
|
||||
- `STATE_FILE` (default `./state.json`): Path to persist last processed UID.
|
||||
- `PIDFILE` (default `/tmp/email_tunnel.pid`): Pidfile for single-instance lock.
|
||||
- `LOG_LEVEL` (default `INFO`): Standard logging levels (`DEBUG`, `INFO`, etc.).
|
||||
|
||||
## Usage
|
||||
```bash
|
||||
export IMAP_IP=127.0.0.1
|
||||
export IMAP_PORT=1143
|
||||
export IMAP_USER=...
|
||||
export IMAP_PASSWORD=...
|
||||
export IMAP_FOLDER=loomio
|
||||
export SMTP_IP=127.0.0.1
|
||||
export SMTP_PORT=2525
|
||||
python3 email_tunnel.py
|
||||
```
|
||||
|
||||
Run under a process supervisor (systemd, supervisord, etc.) so restarts and log rotation are handled.
|
||||
|
||||
## Operational behavior
|
||||
- State tracking: `STATE_FILE` records the last successfully handled UID. It is only advanced after successful delivery, or after a logged `DeliveryError` to avoid reprocessing a permanently failed message.
|
||||
- Retries: IMAP SEARCH/FETCH and SMTP SEND are retried up to 3 times with small backoff. Persistent failures log and move on.
|
||||
- Single instance: pidfile lock prevents multiple concurrent runs; stale pidfiles are replaced if the recorded PID is not running.
|
||||
- Metrics logging: every poll cycle emits a single INFO line with `metrics` prefix and key/value pairs (`cycle_processed`, `cycle_delivered`, `cycle_failed`, `session_*`, `last_uid`, `cycle_duration_ms`).
|
||||
- Missing state: if `STATE_FILE` is absent on startup, the script initializes it to the current max UID (or 0 for an empty mailbox) to avoid reprocessing the entire mailbox.
|
||||
|
||||
## Notes and safety
|
||||
- Leave `IMAP_TLS_VERIFY` enabled in production; disable only for trusted, local endpoints.
|
||||
- Ensure the `STATE_FILE` and `PIDFILE` paths are writable by the service user.
|
||||
- If delivering to a server that rejects certain recipients, the script will log the error, advance the UID, and continue.
|
||||
|
||||
## Troubleshooting
|
||||
- Increase verbosity with `LOG_LEVEL=DEBUG` to see raw message dumps and IMAP folder listings.
|
||||
- If the script exits immediately, verify required env vars and that `IMAP_FOLDER` exists on the server.
|
||||
- For pidfile conflicts, check the process holding the PID or remove a stale pidfile if no process is running.***
|
||||
452
email_tunnel.py
Executable file
452
email_tunnel.py
Executable file
|
|
@ -0,0 +1,452 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
Poll Proton Bridge via IMAP and inject new messages into a local SMTP server.
|
||||
|
||||
- Assumes Proton Bridge IMAP is running locally.
|
||||
- Assumes an SMTP server is listening on a local port (e.g. 2525).
|
||||
"""
|
||||
|
||||
import imaplib
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import smtplib
|
||||
import ssl
|
||||
import sys
|
||||
import time
|
||||
import atexit
|
||||
import errno
|
||||
from dataclasses import dataclass
|
||||
from email import message_from_bytes
|
||||
from email.message import Message
|
||||
from email.utils import getaddresses, parseaddr
|
||||
from typing import Iterable, List, Optional
|
||||
|
||||
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
|
||||
logging.basicConfig(
|
||||
level=LOG_LEVEL,
|
||||
format="[email_tunnel] %(asctime)s %(levelname)s %(message)s",
|
||||
)
|
||||
logger = logging.getLogger("email_tunnel")
|
||||
PIDFILE_PATH = os.getenv("PIDFILE", "/tmp/email_tunnel.pid")
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Config:
|
||||
imap_host: str
|
||||
imap_port: int
|
||||
imap_user: str
|
||||
imap_password: str
|
||||
imap_folder: str
|
||||
imap_tls_verify: bool
|
||||
smtp_host: str
|
||||
smtp_port: int
|
||||
poll_interval: int
|
||||
state_file: str
|
||||
|
||||
@classmethod
|
||||
def from_env(cls) -> "Config":
|
||||
missing: List[str] = []
|
||||
|
||||
def require(name: str) -> str:
|
||||
value = os.getenv(name)
|
||||
if not value:
|
||||
missing.append(name)
|
||||
return value or ""
|
||||
|
||||
imap_host = require("IMAP_IP")
|
||||
imap_port_raw = require("IMAP_PORT")
|
||||
smtp_host = require("SMTP_IP")
|
||||
smtp_port_raw = require("SMTP_PORT")
|
||||
poll_interval_raw = require("POLL_INTERVAL")
|
||||
imap_user = require("IMAP_USER")
|
||||
imap_password = require("IMAP_PASSWORD")
|
||||
imap_folder = require("IMAP_FOLDER")
|
||||
imap_tls_verify_raw = require("IMAP_TLS_VERIFY")
|
||||
|
||||
if missing:
|
||||
raise ValueError(f"Missing required environment variables: {', '.join(missing)}")
|
||||
|
||||
try:
|
||||
imap_port = int(imap_port_raw)
|
||||
except ValueError:
|
||||
raise ValueError("IMAP_PORT must be an integer")
|
||||
|
||||
try:
|
||||
smtp_port = int(smtp_port_raw)
|
||||
except ValueError:
|
||||
raise ValueError("SMTP_PORT must be an integer")
|
||||
|
||||
try:
|
||||
poll_interval = int(poll_interval_raw)
|
||||
except ValueError:
|
||||
raise ValueError("POLL_INTERVAL must be an integer")
|
||||
|
||||
if len(imap_host) == 0:
|
||||
raise ValueError("IMAP_IP must be a valid string")
|
||||
if imap_port <= 0:
|
||||
raise ValueError("IMAP_PORT must be a positive integer")
|
||||
if smtp_port <= 0:
|
||||
raise ValueError("SMTP_PORT must be a positive integer")
|
||||
if poll_interval <= 0:
|
||||
raise ValueError("POLL_INTERVAL must be a positive integer")
|
||||
if len(imap_user) == 0:
|
||||
raise ValueError("IMAP_USER must be a valid string")
|
||||
if len(imap_password) == 0:
|
||||
raise ValueError("IMAP_PASSWORD must be a valid string")
|
||||
if len(imap_folder) == 0:
|
||||
raise ValueError("IMAP_FOLDER must be a valid string")
|
||||
|
||||
return cls(
|
||||
imap_host=imap_host,
|
||||
imap_port=imap_port,
|
||||
imap_user=imap_user,
|
||||
imap_password=imap_password,
|
||||
imap_folder=imap_folder,
|
||||
imap_tls_verify=imap_tls_verify_raw not in ("0", "false", "False"),
|
||||
smtp_host=smtp_host,
|
||||
smtp_port=smtp_port,
|
||||
poll_interval=poll_interval,
|
||||
state_file=os.getenv("STATE_FILE", "./state.json"),
|
||||
)
|
||||
|
||||
|
||||
class FatalConfigError(RuntimeError):
|
||||
"""Raised for configuration problems that require immediate exit."""
|
||||
|
||||
|
||||
class DeliveryError(RuntimeError):
|
||||
"""Raised when a message cannot be delivered after retries."""
|
||||
|
||||
|
||||
def ensure_single_instance() -> None:
|
||||
"""
|
||||
Enforce single instance via pidfile.
|
||||
"""
|
||||
pid = os.getpid()
|
||||
_write_pidfile(PIDFILE_PATH, pid)
|
||||
|
||||
|
||||
def _pid_running(check_pid: int) -> bool:
|
||||
if check_pid <= 0:
|
||||
return False
|
||||
try:
|
||||
os.kill(check_pid, 0)
|
||||
except OSError as exc:
|
||||
if exc.errno in (errno.ESRCH, errno.EINVAL):
|
||||
return False
|
||||
return True
|
||||
else:
|
||||
return True
|
||||
|
||||
|
||||
def _write_pidfile(path: str, pid: int) -> None:
|
||||
try:
|
||||
fd = os.open(path, os.O_CREAT | os.O_EXCL | os.O_WRONLY)
|
||||
with os.fdopen(fd, "w") as f:
|
||||
f.write(str(pid))
|
||||
atexit.register(lambda: _remove_pidfile(path))
|
||||
logger.info("Created pidfile at %s", path)
|
||||
except FileExistsError:
|
||||
try:
|
||||
with open(path, "r") as f:
|
||||
existing_pid = int(f.read().strip() or 0)
|
||||
except Exception:
|
||||
existing_pid = 0
|
||||
|
||||
if existing_pid > 0 and _pid_running(existing_pid):
|
||||
logger.error(
|
||||
"Pidfile %s exists and process %s is running; exiting.", path, existing_pid
|
||||
)
|
||||
sys.exit(1)
|
||||
else:
|
||||
try:
|
||||
os.remove(path)
|
||||
logger.warning("Stale pidfile %s found; replacing.", path)
|
||||
_write_pidfile(path, pid)
|
||||
return
|
||||
except Exception as exc:
|
||||
logger.error("Failed to replace stale pidfile %s: %s", path, exc)
|
||||
sys.exit(1)
|
||||
except OSError as exc:
|
||||
logger.warning("Could not create pidfile %s: %s; continuing without pidfile.", path, exc)
|
||||
|
||||
|
||||
def _remove_pidfile(path: str) -> None:
|
||||
try:
|
||||
if os.path.exists(path):
|
||||
os.remove(path)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def load_last_uid(state_file: str) -> Optional[int]:
|
||||
if not os.path.exists(state_file):
|
||||
return None
|
||||
try:
|
||||
with open(state_file, "r") as f:
|
||||
data = json.load(f)
|
||||
return data.get("last_uid")
|
||||
except Exception as e:
|
||||
logger.warning("Failed to read state file: %s", e)
|
||||
return None
|
||||
|
||||
|
||||
def save_last_uid(state_file: str, uid: int) -> None:
|
||||
state_dir = os.path.dirname(state_file)
|
||||
if state_dir:
|
||||
os.makedirs(state_dir, exist_ok=True)
|
||||
|
||||
tmp = f"{state_file}.tmp"
|
||||
with open(tmp, "w") as f:
|
||||
json.dump({"last_uid": uid}, f)
|
||||
os.replace(tmp, state_file)
|
||||
|
||||
|
||||
def connect_imap(config: Config) -> imaplib.IMAP4:
|
||||
# Proton Bridge usually uses STARTTLS on 1143. If you configured SSL-only, switch to IMAP4_SSL.
|
||||
logger.info("Connecting to IMAP %s:%s ...", config.imap_host, config.imap_port)
|
||||
imap = imaplib.IMAP4(config.imap_host, config.imap_port)
|
||||
|
||||
# Upgrade to TLS (verification can be disabled for local Proton Bridge).
|
||||
ssl_context = ssl.create_default_context()
|
||||
if not config.imap_tls_verify:
|
||||
ssl_context.check_hostname = False
|
||||
ssl_context.verify_mode = ssl.CERT_NONE
|
||||
imap.starttls(ssl_context=ssl_context)
|
||||
|
||||
logger.info("Logging in to IMAP ...")
|
||||
imap.login(config.imap_user, config.imap_password)
|
||||
|
||||
logger.debug("Listing available IMAP folders...")
|
||||
typ, folders = imap.list()
|
||||
available_folders = set()
|
||||
if typ != "OK" or folders is None:
|
||||
raise FatalConfigError(f"Failed to list IMAP folders: {typ}")
|
||||
|
||||
for folder in folders:
|
||||
decoded = folder.decode("utf-8", errors="ignore")
|
||||
available_folders.add(decoded.split(' "/" ')[-1].strip('"'))
|
||||
logger.debug(" %s", decoded)
|
||||
if config.imap_folder not in available_folders:
|
||||
raise FatalConfigError(f"IMAP folder {config.imap_folder!r} not found; aborting.")
|
||||
|
||||
logger.info("Selecting folder %r ...", config.imap_folder)
|
||||
typ, _data = imap.select(config.imap_folder)
|
||||
if typ != "OK":
|
||||
raise FatalConfigError(f"IMAP select {config.imap_folder} failed: {typ}")
|
||||
return imap
|
||||
|
||||
|
||||
def initialize_state_if_needed(imap: imaplib.IMAP4, state_file: str) -> None:
|
||||
"""
|
||||
On first run (no state file), set last_uid to the current max UID,
|
||||
so we don't reprocess the entire mailbox.
|
||||
"""
|
||||
if load_last_uid(state_file) is not None:
|
||||
return
|
||||
|
||||
logger.info("No state file found; initializing from current mailbox state...")
|
||||
typ, data = imap.uid("search", None, "ALL")
|
||||
if typ != "OK":
|
||||
logger.warning("UID SEARCH ALL failed; starting from scratch (no last_uid).")
|
||||
save_last_uid(state_file, 0)
|
||||
return
|
||||
|
||||
uids = parse_uid_list(data)
|
||||
if not uids:
|
||||
logger.info("Mailbox is empty; starting from UID 0.")
|
||||
save_last_uid(state_file, 0)
|
||||
else:
|
||||
max_uid = max(uids)
|
||||
logger.info("Mailbox has existing messages; starting after UID %s.", max_uid)
|
||||
save_last_uid(state_file, max_uid)
|
||||
|
||||
|
||||
def parse_uid_list(data: Iterable[bytes]) -> List[int]:
|
||||
"""
|
||||
IMAP UID SEARCH returns something like [b'1 2 3 4'].
|
||||
Return list of ints.
|
||||
"""
|
||||
if not data or data[0] is None:
|
||||
return []
|
||||
raw = data[0].decode("ascii", errors="ignore").strip()
|
||||
if not raw:
|
||||
return []
|
||||
return [int(x) for x in raw.split() if x.isdigit()]
|
||||
|
||||
|
||||
def fetch_message(imap: imaplib.IMAP4, uid: int) -> Message:
|
||||
attempt = 0
|
||||
last_exc: Optional[Exception] = None
|
||||
while attempt < 3:
|
||||
attempt += 1
|
||||
try:
|
||||
typ, fetch_data = imap.uid("fetch", str(uid), "(RFC822)")
|
||||
if typ != "OK":
|
||||
raise RuntimeError(f"FETCH returned {typ}")
|
||||
|
||||
# fetch_data is like [(b'UID RFC822 {bytes}', b'...raw message...'), b')']
|
||||
for part in fetch_data:
|
||||
if isinstance(part, tuple) and len(part) == 2:
|
||||
return message_from_bytes(part[1])
|
||||
raise RuntimeError(f"Unexpected FETCH response shape: {fetch_data}")
|
||||
except Exception as exc:
|
||||
last_exc = exc
|
||||
logger.warning("FETCH attempt %s for UID %s failed: %s", attempt, uid, exc)
|
||||
time.sleep(min(5, attempt * 2))
|
||||
raise RuntimeError(f"Failed to fetch UID {uid} after retries: {last_exc}")
|
||||
|
||||
|
||||
def extract_recipients(msg: Message) -> List[str]:
|
||||
recipients: List[str] = []
|
||||
for header in ("To", "Cc", "Bcc"):
|
||||
for _name, addr in getaddresses(msg.get_all(header, [])):
|
||||
if addr:
|
||||
recipients.append(addr)
|
||||
# Prevent leaking Bcc headers downstream.
|
||||
if "Bcc" in msg:
|
||||
del msg["Bcc"]
|
||||
cleaned = []
|
||||
seen = set()
|
||||
for addr in recipients:
|
||||
if "@" not in addr:
|
||||
continue
|
||||
addr_lower = addr.lower()
|
||||
if addr_lower in seen:
|
||||
continue
|
||||
seen.add(addr_lower)
|
||||
cleaned.append(addr)
|
||||
return cleaned
|
||||
|
||||
|
||||
def deliver_to_smtp(msg: Message, config: Config) -> None:
|
||||
"""
|
||||
Inject the raw RFC822 message into SMTP using original headers.
|
||||
"""
|
||||
from_header = msg.get("From", "")
|
||||
_sender_name, sender_addr = parseaddr(from_header)
|
||||
sender_addr = sender_addr or "unknown@localhost"
|
||||
|
||||
recipients = extract_recipients(msg)
|
||||
if not recipients:
|
||||
logger.warning("No recipients found in message; skipping delivery.")
|
||||
return
|
||||
logger.info("Sending from %s to %s", sender_addr, recipients)
|
||||
|
||||
if logger.isEnabledFor(logging.DEBUG):
|
||||
try:
|
||||
raw_bytes = msg.as_bytes()
|
||||
logger.debug("--- Begin message to SMTP ---")
|
||||
logger.debug(raw_bytes.decode("utf-8", errors="replace"))
|
||||
logger.debug("--- End message to SMTP ---")
|
||||
except Exception as e:
|
||||
logger.debug("Failed to dump message contents: %s", e)
|
||||
|
||||
logger.info("Connecting to SMTP server...")
|
||||
attempt = 0
|
||||
last_exc: Optional[Exception] = None
|
||||
while attempt < 3:
|
||||
attempt += 1
|
||||
try:
|
||||
with smtplib.SMTP(config.smtp_host, config.smtp_port, timeout=10) as smtp:
|
||||
smtp.sendmail(sender_addr, recipients, msg.as_bytes())
|
||||
logger.info("Delivered message to SMTP.")
|
||||
return
|
||||
except Exception as exc:
|
||||
last_exc = exc
|
||||
logger.warning("SMTP send attempt %s failed: %s", attempt, exc)
|
||||
time.sleep(min(5, attempt * 2))
|
||||
raise DeliveryError(f"Failed to deliver message after retries: {last_exc}")
|
||||
|
||||
|
||||
def poll_loop(config: Config) -> None:
|
||||
while True:
|
||||
imap = None
|
||||
try:
|
||||
imap = connect_imap(config)
|
||||
initialize_state_if_needed(imap, config.state_file)
|
||||
session_stats = {"processed": 0, "delivered": 0, "failed": 0}
|
||||
while True:
|
||||
cycle_start = time.time()
|
||||
cycle_stats = {"processed": 0, "delivered": 0, "failed": 0}
|
||||
last_uid = load_last_uid(config.state_file) or 0
|
||||
search_criteria = f"(UID {last_uid + 1}:*)"
|
||||
logger.info("Checking for new messages with %s ...", search_criteria)
|
||||
|
||||
search_attempt = 0
|
||||
data = []
|
||||
while search_attempt < 3:
|
||||
search_attempt += 1
|
||||
typ, data = imap.uid("search", None, search_criteria)
|
||||
if typ == "OK":
|
||||
break
|
||||
logger.warning("UID SEARCH attempt %s failed: %s", search_attempt, typ)
|
||||
time.sleep(min(5, search_attempt * 2))
|
||||
if typ != "OK":
|
||||
logger.warning("UID SEARCH failed after retries: %s", typ)
|
||||
break
|
||||
|
||||
uids = parse_uid_list(data)
|
||||
if not uids:
|
||||
logger.info("No new messages.")
|
||||
else:
|
||||
logger.info("Found %s new message(s): %s", len(uids), uids)
|
||||
for uid in sorted(uids):
|
||||
cycle_stats["processed"] += 1
|
||||
session_stats["processed"] += 1
|
||||
logger.info("Processing UID %s ...", uid)
|
||||
try:
|
||||
msg = fetch_message(imap, uid)
|
||||
deliver_to_smtp(msg, config)
|
||||
save_last_uid(config.state_file, uid)
|
||||
logger.info("Finished UID %s.", uid)
|
||||
cycle_stats["delivered"] += 1
|
||||
session_stats["delivered"] += 1
|
||||
except DeliveryError as e:
|
||||
cycle_stats["failed"] += 1
|
||||
session_stats["failed"] += 1
|
||||
logger.error("Delivery error for UID %s: %s", uid, e)
|
||||
save_last_uid(config.state_file, uid)
|
||||
logger.info("Advanced state past failed UID %s to avoid reprocessing.", uid)
|
||||
except Exception as e:
|
||||
logger.exception("Error processing UID %s: %s", uid, e)
|
||||
cycle_stats["failed"] += 1
|
||||
session_stats["failed"] += 1
|
||||
|
||||
logger.info(
|
||||
"metrics cycle_processed=%s cycle_delivered=%s cycle_failed=%s "
|
||||
"session_processed=%s session_delivered=%s session_failed=%s "
|
||||
"last_uid=%s cycle_duration_ms=%.0f",
|
||||
cycle_stats["processed"],
|
||||
cycle_stats["delivered"],
|
||||
cycle_stats["failed"],
|
||||
session_stats["processed"],
|
||||
session_stats["delivered"],
|
||||
session_stats["failed"],
|
||||
load_last_uid(config.state_file) or 0,
|
||||
(time.time() - cycle_start) * 1000,
|
||||
)
|
||||
logger.info("Sleeping %s seconds...", config.poll_interval)
|
||||
time.sleep(config.poll_interval)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
logger.info("Interrupted by user, exiting.")
|
||||
break
|
||||
except (FatalConfigError, ValueError) as e:
|
||||
logger.error("Fatal configuration error: %s", e)
|
||||
sys.exit(1)
|
||||
except Exception as e:
|
||||
logger.exception("Top-level error: %s; reconnecting after delay...", e)
|
||||
time.sleep(10)
|
||||
finally:
|
||||
if imap is not None:
|
||||
try:
|
||||
imap.logout()
|
||||
except Exception:
|
||||
logger.debug("Failed to logout cleanly from IMAP.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
ensure_single_instance()
|
||||
poll_loop(Config.from_env())
|
||||
Loading…
Add table
Add a link
Reference in a new issue