Initial Robot U site prototype
This commit is contained in:
commit
fe19f200d7
27 changed files with 3677 additions and 0 deletions
149
calendar_feeds.py
Normal file
149
calendar_feeds.py
Normal file
|
|
@ -0,0 +1,149 @@
|
|||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from datetime import UTC, date, datetime, time
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import httpx
|
||||
|
||||
|
||||
class CalendarFeedError(RuntimeError):
|
||||
pass
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class CalendarEvent:
|
||||
title: str
|
||||
starts_at: datetime
|
||||
source: str
|
||||
mode: str
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class CalendarFeed:
|
||||
url: str
|
||||
source_name: str
|
||||
events: list[CalendarEvent]
|
||||
|
||||
|
||||
async def fetch_calendar_feed(url: str, timeout_seconds: float) -> CalendarFeed:
|
||||
normalized_url = _normalize_calendar_url(url)
|
||||
async with httpx.AsyncClient(timeout=timeout_seconds, follow_redirects=True) as client:
|
||||
response = await client.get(normalized_url)
|
||||
|
||||
if not response.is_success:
|
||||
raise CalendarFeedError(f"{response.status_code} from calendar feed {normalized_url}")
|
||||
|
||||
calendar_name, events = _parse_ics(response.text, normalized_url)
|
||||
return CalendarFeed(url=normalized_url, source_name=calendar_name, events=events)
|
||||
|
||||
|
||||
def _normalize_calendar_url(raw_url: str) -> str:
|
||||
value = raw_url.strip()
|
||||
if not value:
|
||||
raise CalendarFeedError("Calendar feed URL is empty.")
|
||||
|
||||
parsed = urlparse(value)
|
||||
if parsed.scheme == "webcal":
|
||||
return parsed._replace(scheme="https").geturl()
|
||||
if parsed.scheme in {"http", "https"}:
|
||||
return value
|
||||
raise CalendarFeedError(f"Unsupported calendar feed scheme: {parsed.scheme or 'missing'}")
|
||||
|
||||
|
||||
def _parse_ics(raw_text: str, feed_url: str) -> tuple[str, list[CalendarEvent]]:
|
||||
lines = _unfold_ics_lines(raw_text)
|
||||
calendar_name = _calendar_name(lines, feed_url)
|
||||
now = datetime.now(UTC)
|
||||
events: list[CalendarEvent] = []
|
||||
|
||||
current_event: dict[str, str] | None = None
|
||||
for line in lines:
|
||||
if line == "BEGIN:VEVENT":
|
||||
current_event = {}
|
||||
continue
|
||||
if line == "END:VEVENT":
|
||||
parsed_event = _event_from_properties(current_event or {}, calendar_name)
|
||||
if parsed_event and parsed_event.starts_at >= now:
|
||||
events.append(parsed_event)
|
||||
current_event = None
|
||||
continue
|
||||
if current_event is None or ":" not in line:
|
||||
continue
|
||||
|
||||
raw_key, value = line.split(":", 1)
|
||||
current_event[raw_key] = value.strip()
|
||||
|
||||
events.sort(key=lambda event: event.starts_at)
|
||||
return calendar_name, events
|
||||
|
||||
|
||||
def _unfold_ics_lines(raw_text: str) -> list[str]:
|
||||
lines = raw_text.replace("\r\n", "\n").replace("\r", "\n").split("\n")
|
||||
unfolded: list[str] = []
|
||||
|
||||
for line in lines:
|
||||
if not line:
|
||||
continue
|
||||
if unfolded and line[:1] in {" ", "\t"}:
|
||||
unfolded[-1] = f"{unfolded[-1]}{line[1:]}"
|
||||
else:
|
||||
unfolded.append(line)
|
||||
|
||||
return unfolded
|
||||
|
||||
|
||||
def _calendar_name(lines: list[str], feed_url: str) -> str:
|
||||
for line in lines:
|
||||
if line.startswith("X-WR-CALNAME:"):
|
||||
return _decode_ics_text(line.split(":", 1)[1].strip()) or _calendar_host(feed_url)
|
||||
return _calendar_host(feed_url)
|
||||
|
||||
|
||||
def _calendar_host(feed_url: str) -> str:
|
||||
parsed = urlparse(feed_url)
|
||||
return parsed.hostname or "Calendar"
|
||||
|
||||
|
||||
def _event_from_properties(properties: dict[str, str], calendar_name: str) -> CalendarEvent | None:
|
||||
title = _decode_ics_text(properties.get("SUMMARY", "").strip())
|
||||
start_key = next((key for key in properties if key.startswith("DTSTART")), None)
|
||||
if not title or not start_key:
|
||||
return None
|
||||
|
||||
starts_at = _parse_ics_datetime(start_key, properties[start_key])
|
||||
if starts_at is None:
|
||||
return None
|
||||
|
||||
location = _decode_ics_text(properties.get("LOCATION", "").strip())
|
||||
return CalendarEvent(
|
||||
title=title,
|
||||
starts_at=starts_at,
|
||||
source=calendar_name,
|
||||
mode=location or "Calendar",
|
||||
)
|
||||
|
||||
|
||||
def _parse_ics_datetime(key: str, value: str) -> datetime | None:
|
||||
try:
|
||||
if "VALUE=DATE" in key:
|
||||
parsed_date = datetime.strptime(value, "%Y%m%d").date()
|
||||
return datetime.combine(parsed_date, time.min, tzinfo=UTC)
|
||||
if value.endswith("Z"):
|
||||
return datetime.strptime(value, "%Y%m%dT%H%M%SZ").replace(tzinfo=UTC)
|
||||
if "T" in value:
|
||||
return datetime.strptime(value, "%Y%m%dT%H%M%S").replace(tzinfo=UTC)
|
||||
parsed_date = datetime.strptime(value, "%Y%m%d").date()
|
||||
return datetime.combine(parsed_date, time.min, tzinfo=UTC)
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
|
||||
def _decode_ics_text(value: str) -> str:
|
||||
return (
|
||||
value.replace("\\n", "\n")
|
||||
.replace("\\N", "\n")
|
||||
.replace("\\,", ",")
|
||||
.replace("\\;", ";")
|
||||
.replace("\\\\", "\\")
|
||||
)
|
||||
Loading…
Add table
Add a link
Reference in a new issue