from __future__ import annotations from dataclasses import dataclass from datetime import UTC, datetime, time, timedelta from zoneinfo import ZoneInfo, ZoneInfoNotFoundError from urllib.parse import urlparse import httpx MAX_RECURRENCE_OCCURRENCES = 120 RECURRENCE_LOOKAHEAD_DAYS = 365 class CalendarFeedError(RuntimeError): pass @dataclass(frozen=True) class CalendarEvent: title: str starts_at: datetime source: str mode: str @dataclass(frozen=True) class CalendarFeed: url: str source_name: str events: list[CalendarEvent] async def fetch_calendar_feed(url: str, timeout_seconds: float) -> CalendarFeed: normalized_url = _normalize_calendar_url(url) async with httpx.AsyncClient(timeout=timeout_seconds, follow_redirects=True) as client: response = await client.get( normalized_url, headers={"User-Agent": "RobotUCalendar/0.1 (+https://aksal.cloud)"}, ) if not response.is_success: raise CalendarFeedError(f"{response.status_code} from calendar feed {normalized_url}") calendar_name, events = _parse_ics(response.text, normalized_url) return CalendarFeed(url=normalized_url, source_name=calendar_name, events=events) def _normalize_calendar_url(raw_url: str) -> str: value = raw_url.strip() if not value: raise CalendarFeedError("Calendar feed URL is empty.") parsed = urlparse(value) if parsed.scheme == "webcal": return parsed._replace(scheme="https").geturl() if parsed.scheme in {"http", "https"}: return value raise CalendarFeedError(f"Unsupported calendar feed scheme: {parsed.scheme or 'missing'}") def _parse_ics(raw_text: str, feed_url: str) -> tuple[str, list[CalendarEvent]]: lines = _unfold_ics_lines(raw_text) calendar_name = _calendar_name(lines, feed_url) now = datetime.now(UTC) recurrence_until = now + timedelta(days=RECURRENCE_LOOKAHEAD_DAYS) events: list[CalendarEvent] = [] current_event: dict[str, str] | None = None for line in lines: if line == "BEGIN:VEVENT": current_event = {} continue if line == "END:VEVENT": events.extend( _events_from_properties( current_event or {}, calendar_name, now=now, recurrence_until=recurrence_until, ), ) current_event = None continue if current_event is None or ":" not in line: continue raw_key, value = line.split(":", 1) current_event[raw_key] = value.strip() events.sort(key=lambda event: event.starts_at) return calendar_name, events def _unfold_ics_lines(raw_text: str) -> list[str]: lines = raw_text.replace("\r\n", "\n").replace("\r", "\n").split("\n") unfolded: list[str] = [] for line in lines: if not line: continue if unfolded and line[:1] in {" ", "\t"}: unfolded[-1] = f"{unfolded[-1]}{line[1:]}" else: unfolded.append(line) return unfolded def _calendar_name(lines: list[str], feed_url: str) -> str: for line in lines: if line.startswith("X-WR-CALNAME:"): return _decode_ics_text(line.split(":", 1)[1].strip()) or _calendar_host(feed_url) return _calendar_host(feed_url) def _calendar_host(feed_url: str) -> str: parsed = urlparse(feed_url) return parsed.hostname or "Calendar" def _events_from_properties( properties: dict[str, str], calendar_name: str, *, now: datetime, recurrence_until: datetime, ) -> list[CalendarEvent]: title = _decode_ics_text(properties.get("SUMMARY", "").strip()) start_key = next((key for key in properties if key.startswith("DTSTART")), None) if not title or not start_key: return [] starts_at = _parse_ics_datetime(start_key, properties[start_key]) if starts_at is None: return [] location = _decode_ics_text(properties.get("LOCATION", "").strip()) event_template = { "title": title, "source": calendar_name, "mode": location or "Calendar", } recurrence_rule = properties.get("RRULE") if not recurrence_rule: if starts_at >= now: return [CalendarEvent(starts_at=starts_at, **event_template)] return [] return [ CalendarEvent(starts_at=occurrence, **event_template) for occurrence in _recurrence_occurrences( starts_at, recurrence_rule, now=now, recurrence_until=recurrence_until, start_key=start_key, ) ] def _recurrence_occurrences( starts_at: datetime, raw_rule: str, *, now: datetime, recurrence_until: datetime, start_key: str, ) -> list[datetime]: rule = _parse_recurrence_rule(raw_rule) frequency = rule.get("FREQ", "").upper() interval = _positive_int(rule.get("INTERVAL"), default=1) count = _positive_int(rule.get("COUNT"), default=MAX_RECURRENCE_OCCURRENCES) until = _parse_recurrence_until(rule.get("UNTIL"), start_key) effective_until = min( [candidate for candidate in (until, recurrence_until) if candidate is not None], default=recurrence_until, ) occurrences: list[datetime] = [] occurrence = starts_at for index in range(count): if occurrence > effective_until: break if occurrence >= now: occurrences.append(occurrence) if len(occurrences) >= MAX_RECURRENCE_OCCURRENCES: break next_occurrence = _next_recurrence_occurrence(occurrence, frequency, interval) if next_occurrence is None or next_occurrence <= occurrence: break occurrence = next_occurrence return occurrences def _parse_recurrence_rule(raw_rule: str) -> dict[str, str]: rule: dict[str, str] = {} for segment in raw_rule.split(";"): if "=" not in segment: continue key, value = segment.split("=", 1) rule[key.strip().upper()] = value.strip() return rule def _positive_int(value: str | None, *, default: int) -> int: if value is None: return default try: parsed = int(value) except ValueError: return default return parsed if parsed > 0 else default def _parse_recurrence_until(value: str | None, start_key: str) -> datetime | None: if not value: return None return _parse_ics_datetime(start_key, value) def _next_recurrence_occurrence( occurrence: datetime, frequency: str, interval: int, ) -> datetime | None: if frequency == "DAILY": return occurrence + timedelta(days=interval) if frequency == "WEEKLY": return occurrence + timedelta(weeks=interval) if frequency == "MONTHLY": return _add_months(occurrence, interval) if frequency == "YEARLY": return _add_months(occurrence, interval * 12) return None def _add_months(value: datetime, months: int) -> datetime: month_index = value.month - 1 + months year = value.year + month_index // 12 month = month_index % 12 + 1 day = min(value.day, _days_in_month(year, month)) return value.replace(year=year, month=month, day=day) def _days_in_month(year: int, month: int) -> int: if month == 12: next_month = datetime(year + 1, 1, 1) else: next_month = datetime(year, month + 1, 1) this_month = datetime(year, month, 1) return (next_month - this_month).days def _parse_ics_datetime(key: str, value: str) -> datetime | None: try: if "VALUE=DATE" in key: parsed_date = datetime.strptime(value, "%Y%m%d").date() return datetime.combine(parsed_date, time.min, tzinfo=UTC) if value.endswith("Z"): return datetime.strptime(value, "%Y%m%dT%H%M%SZ").replace(tzinfo=UTC) if "T" in value: return _with_ics_timezone(datetime.strptime(value, "%Y%m%dT%H%M%S"), key) parsed_date = datetime.strptime(value, "%Y%m%d").date() return datetime.combine(parsed_date, time.min, tzinfo=UTC) except ValueError: return None def _with_ics_timezone(value: datetime, key: str) -> datetime: timezone_name = _ics_timezone_name(key) if not timezone_name: return value.replace(tzinfo=UTC) try: return value.replace(tzinfo=ZoneInfo(timezone_name)) except ZoneInfoNotFoundError: return value.replace(tzinfo=UTC) def _ics_timezone_name(key: str) -> str | None: for segment in key.split(";")[1:]: if segment.startswith("TZID="): return segment.split("=", 1)[1].strip() or None return None def _decode_ics_text(value: str) -> str: return ( value.replace("\\n", "\n") .replace("\\N", "\n") .replace("\\,", ",") .replace("\\;", ";") .replace("\\\\", "\\") )