Build Forgejo-backed community prototype

This commit is contained in:
kacper 2026-04-12 20:15:33 -04:00
parent 797ae5ea35
commit 6671a01d26
16 changed files with 2485 additions and 293 deletions

View file

@ -1,11 +1,15 @@
from __future__ import annotations
from dataclasses import dataclass
from datetime import UTC, date, datetime, time
from datetime import UTC, datetime, time, timedelta
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
from urllib.parse import urlparse
import httpx
MAX_RECURRENCE_OCCURRENCES = 120
RECURRENCE_LOOKAHEAD_DAYS = 365
class CalendarFeedError(RuntimeError):
pass
@ -29,7 +33,10 @@ class CalendarFeed:
async def fetch_calendar_feed(url: str, timeout_seconds: float) -> CalendarFeed:
normalized_url = _normalize_calendar_url(url)
async with httpx.AsyncClient(timeout=timeout_seconds, follow_redirects=True) as client:
response = await client.get(normalized_url)
response = await client.get(
normalized_url,
headers={"User-Agent": "RobotUCalendar/0.1 (+https://aksal.cloud)"},
)
if not response.is_success:
raise CalendarFeedError(f"{response.status_code} from calendar feed {normalized_url}")
@ -55,6 +62,7 @@ def _parse_ics(raw_text: str, feed_url: str) -> tuple[str, list[CalendarEvent]]:
lines = _unfold_ics_lines(raw_text)
calendar_name = _calendar_name(lines, feed_url)
now = datetime.now(UTC)
recurrence_until = now + timedelta(days=RECURRENCE_LOOKAHEAD_DAYS)
events: list[CalendarEvent] = []
current_event: dict[str, str] | None = None
@ -63,9 +71,14 @@ def _parse_ics(raw_text: str, feed_url: str) -> tuple[str, list[CalendarEvent]]:
current_event = {}
continue
if line == "END:VEVENT":
parsed_event = _event_from_properties(current_event or {}, calendar_name)
if parsed_event and parsed_event.starts_at >= now:
events.append(parsed_event)
events.extend(
_events_from_properties(
current_event or {},
calendar_name,
now=now,
recurrence_until=recurrence_until,
),
)
current_event = None
continue
if current_event is None or ":" not in line:
@ -105,23 +118,139 @@ def _calendar_host(feed_url: str) -> str:
return parsed.hostname or "Calendar"
def _event_from_properties(properties: dict[str, str], calendar_name: str) -> CalendarEvent | None:
def _events_from_properties(
properties: dict[str, str],
calendar_name: str,
*,
now: datetime,
recurrence_until: datetime,
) -> list[CalendarEvent]:
title = _decode_ics_text(properties.get("SUMMARY", "").strip())
start_key = next((key for key in properties if key.startswith("DTSTART")), None)
if not title or not start_key:
return None
return []
starts_at = _parse_ics_datetime(start_key, properties[start_key])
if starts_at is None:
return None
return []
location = _decode_ics_text(properties.get("LOCATION", "").strip())
return CalendarEvent(
title=title,
starts_at=starts_at,
source=calendar_name,
mode=location or "Calendar",
event_template = {
"title": title,
"source": calendar_name,
"mode": location or "Calendar",
}
recurrence_rule = properties.get("RRULE")
if not recurrence_rule:
if starts_at >= now:
return [CalendarEvent(starts_at=starts_at, **event_template)]
return []
return [
CalendarEvent(starts_at=occurrence, **event_template)
for occurrence in _recurrence_occurrences(
starts_at,
recurrence_rule,
now=now,
recurrence_until=recurrence_until,
start_key=start_key,
)
]
def _recurrence_occurrences(
starts_at: datetime,
raw_rule: str,
*,
now: datetime,
recurrence_until: datetime,
start_key: str,
) -> list[datetime]:
rule = _parse_recurrence_rule(raw_rule)
frequency = rule.get("FREQ", "").upper()
interval = _positive_int(rule.get("INTERVAL"), default=1)
count = _positive_int(rule.get("COUNT"), default=MAX_RECURRENCE_OCCURRENCES)
until = _parse_recurrence_until(rule.get("UNTIL"), start_key)
effective_until = min(
[candidate for candidate in (until, recurrence_until) if candidate is not None],
default=recurrence_until,
)
occurrences: list[datetime] = []
occurrence = starts_at
for index in range(count):
if occurrence > effective_until:
break
if occurrence >= now:
occurrences.append(occurrence)
if len(occurrences) >= MAX_RECURRENCE_OCCURRENCES:
break
next_occurrence = _next_recurrence_occurrence(occurrence, frequency, interval)
if next_occurrence is None or next_occurrence <= occurrence:
break
occurrence = next_occurrence
return occurrences
def _parse_recurrence_rule(raw_rule: str) -> dict[str, str]:
rule: dict[str, str] = {}
for segment in raw_rule.split(";"):
if "=" not in segment:
continue
key, value = segment.split("=", 1)
rule[key.strip().upper()] = value.strip()
return rule
def _positive_int(value: str | None, *, default: int) -> int:
if value is None:
return default
try:
parsed = int(value)
except ValueError:
return default
return parsed if parsed > 0 else default
def _parse_recurrence_until(value: str | None, start_key: str) -> datetime | None:
if not value:
return None
return _parse_ics_datetime(start_key, value)
def _next_recurrence_occurrence(
occurrence: datetime,
frequency: str,
interval: int,
) -> datetime | None:
if frequency == "DAILY":
return occurrence + timedelta(days=interval)
if frequency == "WEEKLY":
return occurrence + timedelta(weeks=interval)
if frequency == "MONTHLY":
return _add_months(occurrence, interval)
if frequency == "YEARLY":
return _add_months(occurrence, interval * 12)
return None
def _add_months(value: datetime, months: int) -> datetime:
month_index = value.month - 1 + months
year = value.year + month_index // 12
month = month_index % 12 + 1
day = min(value.day, _days_in_month(year, month))
return value.replace(year=year, month=month, day=day)
def _days_in_month(year: int, month: int) -> int:
if month == 12:
next_month = datetime(year + 1, 1, 1)
else:
next_month = datetime(year, month + 1, 1)
this_month = datetime(year, month, 1)
return (next_month - this_month).days
def _parse_ics_datetime(key: str, value: str) -> datetime | None:
@ -132,13 +261,31 @@ def _parse_ics_datetime(key: str, value: str) -> datetime | None:
if value.endswith("Z"):
return datetime.strptime(value, "%Y%m%dT%H%M%SZ").replace(tzinfo=UTC)
if "T" in value:
return datetime.strptime(value, "%Y%m%dT%H%M%S").replace(tzinfo=UTC)
return _with_ics_timezone(datetime.strptime(value, "%Y%m%dT%H%M%S"), key)
parsed_date = datetime.strptime(value, "%Y%m%d").date()
return datetime.combine(parsed_date, time.min, tzinfo=UTC)
except ValueError:
return None
def _with_ics_timezone(value: datetime, key: str) -> datetime:
timezone_name = _ics_timezone_name(key)
if not timezone_name:
return value.replace(tzinfo=UTC)
try:
return value.replace(tzinfo=ZoneInfo(timezone_name))
except ZoneInfoNotFoundError:
return value.replace(tzinfo=UTC)
def _ics_timezone_name(key: str) -> str | None:
for segment in key.split(";")[1:]:
if segment.startswith("TZID="):
return segment.split("=", 1)[1].strip() or None
return None
def _decode_ics_text(value: str) -> str:
return (
value.replace("\\n", "\n")