robot-u-site/calendar_feeds.py

296 lines
8.8 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from datetime import UTC, datetime, time, timedelta
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
from urllib.parse import urlparse
import httpx
MAX_RECURRENCE_OCCURRENCES = 120
RECURRENCE_LOOKAHEAD_DAYS = 365
class CalendarFeedError(RuntimeError):
pass
@dataclass(frozen=True)
class CalendarEvent:
title: str
starts_at: datetime
source: str
mode: str
@dataclass(frozen=True)
class CalendarFeed:
url: str
source_name: str
events: list[CalendarEvent]
async def fetch_calendar_feed(url: str, timeout_seconds: float) -> CalendarFeed:
normalized_url = _normalize_calendar_url(url)
async with httpx.AsyncClient(timeout=timeout_seconds, follow_redirects=True) as client:
response = await client.get(
normalized_url,
headers={"User-Agent": "RobotUCalendar/0.1 (+https://aksal.cloud)"},
)
if not response.is_success:
raise CalendarFeedError(f"{response.status_code} from calendar feed {normalized_url}")
calendar_name, events = _parse_ics(response.text, normalized_url)
return CalendarFeed(url=normalized_url, source_name=calendar_name, events=events)
def _normalize_calendar_url(raw_url: str) -> str:
value = raw_url.strip()
if not value:
raise CalendarFeedError("Calendar feed URL is empty.")
parsed = urlparse(value)
if parsed.scheme == "webcal":
return parsed._replace(scheme="https").geturl()
if parsed.scheme in {"http", "https"}:
return value
raise CalendarFeedError(f"Unsupported calendar feed scheme: {parsed.scheme or 'missing'}")
def _parse_ics(raw_text: str, feed_url: str) -> tuple[str, list[CalendarEvent]]:
lines = _unfold_ics_lines(raw_text)
calendar_name = _calendar_name(lines, feed_url)
now = datetime.now(UTC)
recurrence_until = now + timedelta(days=RECURRENCE_LOOKAHEAD_DAYS)
events: list[CalendarEvent] = []
current_event: dict[str, str] | None = None
for line in lines:
if line == "BEGIN:VEVENT":
current_event = {}
continue
if line == "END:VEVENT":
events.extend(
_events_from_properties(
current_event or {},
calendar_name,
now=now,
recurrence_until=recurrence_until,
),
)
current_event = None
continue
if current_event is None or ":" not in line:
continue
raw_key, value = line.split(":", 1)
current_event[raw_key] = value.strip()
events.sort(key=lambda event: event.starts_at)
return calendar_name, events
def _unfold_ics_lines(raw_text: str) -> list[str]:
lines = raw_text.replace("\r\n", "\n").replace("\r", "\n").split("\n")
unfolded: list[str] = []
for line in lines:
if not line:
continue
if unfolded and line[:1] in {" ", "\t"}:
unfolded[-1] = f"{unfolded[-1]}{line[1:]}"
else:
unfolded.append(line)
return unfolded
def _calendar_name(lines: list[str], feed_url: str) -> str:
for line in lines:
if line.startswith("X-WR-CALNAME:"):
return _decode_ics_text(line.split(":", 1)[1].strip()) or _calendar_host(feed_url)
return _calendar_host(feed_url)
def _calendar_host(feed_url: str) -> str:
parsed = urlparse(feed_url)
return parsed.hostname or "Calendar"
def _events_from_properties(
properties: dict[str, str],
calendar_name: str,
*,
now: datetime,
recurrence_until: datetime,
) -> list[CalendarEvent]:
title = _decode_ics_text(properties.get("SUMMARY", "").strip())
start_key = next((key for key in properties if key.startswith("DTSTART")), None)
if not title or not start_key:
return []
starts_at = _parse_ics_datetime(start_key, properties[start_key])
if starts_at is None:
return []
location = _decode_ics_text(properties.get("LOCATION", "").strip())
event_template = {
"title": title,
"source": calendar_name,
"mode": location or "Calendar",
}
recurrence_rule = properties.get("RRULE")
if not recurrence_rule:
if starts_at >= now:
return [CalendarEvent(starts_at=starts_at, **event_template)]
return []
return [
CalendarEvent(starts_at=occurrence, **event_template)
for occurrence in _recurrence_occurrences(
starts_at,
recurrence_rule,
now=now,
recurrence_until=recurrence_until,
start_key=start_key,
)
]
def _recurrence_occurrences(
starts_at: datetime,
raw_rule: str,
*,
now: datetime,
recurrence_until: datetime,
start_key: str,
) -> list[datetime]:
rule = _parse_recurrence_rule(raw_rule)
frequency = rule.get("FREQ", "").upper()
interval = _positive_int(rule.get("INTERVAL"), default=1)
count = _positive_int(rule.get("COUNT"), default=MAX_RECURRENCE_OCCURRENCES)
until = _parse_recurrence_until(rule.get("UNTIL"), start_key)
effective_until = min(
[candidate for candidate in (until, recurrence_until) if candidate is not None],
default=recurrence_until,
)
occurrences: list[datetime] = []
occurrence = starts_at
for index in range(count):
if occurrence > effective_until:
break
if occurrence >= now:
occurrences.append(occurrence)
if len(occurrences) >= MAX_RECURRENCE_OCCURRENCES:
break
next_occurrence = _next_recurrence_occurrence(occurrence, frequency, interval)
if next_occurrence is None or next_occurrence <= occurrence:
break
occurrence = next_occurrence
return occurrences
def _parse_recurrence_rule(raw_rule: str) -> dict[str, str]:
rule: dict[str, str] = {}
for segment in raw_rule.split(";"):
if "=" not in segment:
continue
key, value = segment.split("=", 1)
rule[key.strip().upper()] = value.strip()
return rule
def _positive_int(value: str | None, *, default: int) -> int:
if value is None:
return default
try:
parsed = int(value)
except ValueError:
return default
return parsed if parsed > 0 else default
def _parse_recurrence_until(value: str | None, start_key: str) -> datetime | None:
if not value:
return None
return _parse_ics_datetime(start_key, value)
def _next_recurrence_occurrence(
occurrence: datetime,
frequency: str,
interval: int,
) -> datetime | None:
if frequency == "DAILY":
return occurrence + timedelta(days=interval)
if frequency == "WEEKLY":
return occurrence + timedelta(weeks=interval)
if frequency == "MONTHLY":
return _add_months(occurrence, interval)
if frequency == "YEARLY":
return _add_months(occurrence, interval * 12)
return None
def _add_months(value: datetime, months: int) -> datetime:
month_index = value.month - 1 + months
year = value.year + month_index // 12
month = month_index % 12 + 1
day = min(value.day, _days_in_month(year, month))
return value.replace(year=year, month=month, day=day)
def _days_in_month(year: int, month: int) -> int:
if month == 12:
next_month = datetime(year + 1, 1, 1)
else:
next_month = datetime(year, month + 1, 1)
this_month = datetime(year, month, 1)
return (next_month - this_month).days
def _parse_ics_datetime(key: str, value: str) -> datetime | None:
try:
if "VALUE=DATE" in key:
parsed_date = datetime.strptime(value, "%Y%m%d").date()
return datetime.combine(parsed_date, time.min, tzinfo=UTC)
if value.endswith("Z"):
return datetime.strptime(value, "%Y%m%dT%H%M%SZ").replace(tzinfo=UTC)
if "T" in value:
return _with_ics_timezone(datetime.strptime(value, "%Y%m%dT%H%M%S"), key)
parsed_date = datetime.strptime(value, "%Y%m%d").date()
return datetime.combine(parsed_date, time.min, tzinfo=UTC)
except ValueError:
return None
def _with_ics_timezone(value: datetime, key: str) -> datetime:
timezone_name = _ics_timezone_name(key)
if not timezone_name:
return value.replace(tzinfo=UTC)
try:
return value.replace(tzinfo=ZoneInfo(timezone_name))
except ZoneInfoNotFoundError:
return value.replace(tzinfo=UTC)
def _ics_timezone_name(key: str) -> str | None:
for segment in key.split(";")[1:]:
if segment.startswith("TZID="):
return segment.split("=", 1)[1].strip() or None
return None
def _decode_ics_text(value: str) -> str:
return (
value.replace("\\n", "\n")
.replace("\\N", "\n")
.replace("\\,", ",")
.replace("\\;", ";")
.replace("\\\\", "\\")
)