robot-u-site/live_prototype.py

1008 lines
32 KiB
Python
Raw Normal View History

2026-04-08 06:03:48 -04:00
from __future__ import annotations
import asyncio
2026-04-13 18:19:50 -04:00
import re
2026-04-08 06:03:48 -04:00
from typing import Any
2026-04-13 18:19:50 -04:00
from urllib.parse import unquote, urlparse
2026-04-08 06:03:48 -04:00
from calendar_feeds import CalendarFeed, CalendarFeedError, fetch_calendar_feed
from forgejo_client import ForgejoClient, ForgejoClientError
from settings import Settings
async def build_live_prototype_payload(
settings: Settings,
*,
forgejo_token: str | None = None,
auth_source: str = "none",
session_user: dict[str, Any] | None = None,
) -> dict[str, object]:
2026-04-08 06:03:48 -04:00
warnings: list[str] = []
access_token = forgejo_token or settings.forgejo_token
has_user_token = bool(access_token) and auth_source in {"authorization", "session"}
2026-04-08 06:03:48 -04:00
source_cards = [
{
"title": "Forgejo base URL",
"description": settings.forgejo_base_url,
},
{
"title": "Access mode",
"description": _access_mode_description(access_token, auth_source),
2026-04-08 06:03:48 -04:00
},
]
calendar_feeds = await _load_calendar_feeds(settings, warnings)
if settings.calendar_feed_urls:
source_cards.append(
{
"title": "Calendar feeds",
"description": f"{len(calendar_feeds)} configured feed(s)",
},
)
async with ForgejoClient(settings, forgejo_token=access_token) as client:
2026-04-08 06:03:48 -04:00
try:
oidc = await client.fetch_openid_configuration()
except ForgejoClientError as error:
warnings.append(str(error))
oidc = {}
issuer = oidc.get("issuer", "Unavailable")
source_cards.append(
{
"title": "OIDC issuer",
"description": str(issuer),
},
)
try:
repos = await client.search_repositories()
2026-04-08 06:03:48 -04:00
except ForgejoClientError as error:
warnings.append(str(error))
source_cards.append(
{
"title": "Discovery state",
"description": "Forgejo connection exists, but live repo discovery failed.",
},
)
return _empty_payload(
source_cards=source_cards,
warnings=warnings,
auth=_auth_payload(
session_user, _display_auth_source(auth_source, session_user), settings
),
2026-04-08 06:03:48 -04:00
hero_summary=(
"The backend reached aksal.cloud, but the configured token could not complete "
"the public repo discovery flow."
2026-04-08 06:03:48 -04:00
),
)
2026-04-13 18:19:50 -04:00
repos = await _with_configured_discussion_repo(client, repos, settings, warnings)
current_user = await _current_user_for_auth_source(client, has_user_token, warnings)
public_repos = [repo for repo in repos if not repo.get("fork") and not repo.get("private")]
2026-04-08 06:03:48 -04:00
repo_summaries = await asyncio.gather(
*[_summarize_repo(client, repo) for repo in public_repos],
2026-04-08 06:03:48 -04:00
)
content_repos = [summary for summary in repo_summaries if summary is not None]
course_repos = [summary for summary in content_repos if summary["lesson_count"] > 0]
post_repos = [summary for summary in content_repos if summary["blog_count"] > 0]
2026-04-12 20:23:05 -04:00
blog_posts = sorted(
[post for summary in post_repos for post in summary["blog_posts"]],
key=lambda post: str(post.get("updated_at", "")),
reverse=True,
)
public_issues = await _recent_public_issues(
client,
public_repos,
settings.forgejo_recent_issue_limit,
2026-04-08 06:03:48 -04:00
)
if current_user is not None:
source_cards.append(
{
"title": "Signed-in API identity",
"description": str(current_user.get("login", "Unknown user")),
},
)
2026-04-08 06:03:48 -04:00
source_cards.append(
{
"title": "Discovery state",
"description": (
f"Detected {len(course_repos)} course repos, {len(post_repos)} post repos, "
f"and {len(public_issues)} recent public issues."
2026-04-08 06:03:48 -04:00
),
},
)
auth_user = session_user or current_user
2026-04-08 06:03:48 -04:00
return {
"hero": {
"eyebrow": "Live Forgejo integration",
"title": "Robot U is reading from your aksal.cloud Forgejo instance.",
"summary": (
"This prototype now uses the real Forgejo base URL, OIDC metadata, visible repos, "
"and recent issues available to the active token."
2026-04-08 06:03:48 -04:00
),
"highlights": [
"Repo discovery filters to public, non-fork repositories only",
"Course repos are detected from /lessons/, post repos from /blogs/",
"Recent discussions are loaded from live Forgejo issues",
],
},
"auth": _auth_payload(
auth_user if has_user_token else session_user,
_display_auth_source(auth_source, session_user),
settings,
),
2026-04-08 06:03:48 -04:00
"source_of_truth": source_cards,
2026-04-13 18:19:50 -04:00
"discussion_settings": _discussion_settings(settings),
"featured_courses": [_course_card(summary) for summary in course_repos],
"recent_posts": [_post_card(post) for post in blog_posts],
2026-04-08 06:03:48 -04:00
"upcoming_events": _event_cards(calendar_feeds, settings.calendar_event_limit),
"recent_discussions": await asyncio.gather(
*[_discussion_card(client, issue) for issue in public_issues],
2026-04-08 06:03:48 -04:00
),
"implementation_notes": [
"Live repo discovery is now driven by the Forgejo API instead of mock content.",
"Issues shown here are loaded only from public Forgejo repositories.",
2026-04-08 06:03:48 -04:00
*warnings,
],
}
def _access_mode_description(access_token: str | None, auth_source: str) -> str:
if auth_source in {"authorization", "session"} and access_token:
return f"Authenticated through {auth_source} token."
if auth_source == "server" or access_token:
return "Reading public content through the server-side Forgejo token."
return "Reading public content anonymously."
async def _current_user_for_auth_source(
client: ForgejoClient,
has_user_token: bool,
warnings: list[str],
) -> dict[str, Any] | None:
if not has_user_token:
return None
try:
return await client.fetch_current_user()
except ForgejoClientError as error:
warnings.append(str(error))
return None
2026-04-13 18:19:50 -04:00
async def _with_configured_discussion_repo(
client: ForgejoClient,
repos: list[dict[str, Any]],
settings: Settings,
warnings: list[str],
) -> list[dict[str, Any]]:
owner_repo = _configured_owner_repo(settings.forgejo_general_discussion_repo)
if owner_repo is None:
return repos
owner, repo = owner_repo
full_name = f"{owner}/{repo}".lower()
if any(str(candidate.get("full_name", "")).lower() == full_name for candidate in repos):
return repos
try:
configured_repo = await client.fetch_repository(owner, repo)
except ForgejoClientError as error:
warnings.append(f"General discussion repo could not be loaded: {error}")
return repos
return [*repos, configured_repo]
def _configured_owner_repo(value: str | None) -> tuple[str, str] | None:
if not value:
return None
owner, separator, repo = value.strip().partition("/")
if not separator or not owner or not repo or "/" in repo:
return None
return owner, repo
def _discussion_settings(settings: Settings) -> dict[str, object]:
return _discussion_settings_from_configured(
_configured_owner_repo(settings.forgejo_general_discussion_repo) is not None,
)
def _discussion_settings_from_configured(general_discussion_configured: bool) -> dict[str, object]:
return {"general_discussion_configured": general_discussion_configured}
2026-04-08 06:03:48 -04:00
async def _summarize_repo(
client: ForgejoClient,
repo: dict[str, Any],
) -> dict[str, Any] | None:
owner_login = _repo_owner_login(repo)
2026-04-08 06:03:48 -04:00
repo_name = repo.get("name")
if not isinstance(owner_login, str) or not isinstance(repo_name, str):
return None
2026-04-13 18:19:50 -04:00
default_branch = str(repo.get("default_branch") or "main")
2026-04-08 06:03:48 -04:00
try:
root_entries = await client.list_directory(owner_login, repo_name)
except ForgejoClientError:
return None
entry_names = {
entry.get("name")
for entry in root_entries
if entry.get("type") == "dir" and isinstance(entry.get("name"), str)
}
has_lessons = "lessons" in entry_names
has_blogs = "blogs" in entry_names
if not has_lessons and not has_blogs:
return None
chapter_count = 0
lesson_count = 0
course_outline: list[dict[str, object]] = []
if has_lessons:
lesson_entries = await client.list_directory(owner_login, repo_name, "lessons")
chapter_dirs = _sorted_dir_entries(lesson_entries)
chapter_count = len(chapter_dirs)
chapter_entry_lists = await asyncio.gather(
*[
client.list_directory(owner_login, repo_name, f"lessons/{entry['name']}")
for entry in chapter_dirs
if isinstance(entry.get("name"), str)
],
)
lesson_count = sum(
1
for chapter_entries in chapter_entry_lists
for entry in chapter_entries
if entry.get("type") == "dir"
)
for chapter_dir, chapter_entries in zip(chapter_dirs, chapter_entry_lists, strict=False):
chapter_name = str(chapter_dir.get("name", ""))
lesson_dirs = _sorted_dir_entries(chapter_entries)
lesson_summaries = await asyncio.gather(
*[
_summarize_lesson(
client,
owner_login,
repo_name,
2026-04-13 18:19:50 -04:00
default_branch,
str(repo.get("html_url", "")),
2026-04-08 06:03:48 -04:00
chapter_name,
str(lesson_dir.get("name", "")),
)
for lesson_dir in lesson_dirs
],
)
course_outline.append(
{
"slug": chapter_name,
"title": _display_name(chapter_name),
"lessons": lesson_summaries,
},
)
blog_count = 0
2026-04-12 20:23:05 -04:00
blog_posts: list[dict[str, object]] = []
2026-04-08 06:03:48 -04:00
if has_blogs:
blog_entries = await client.list_directory(owner_login, repo_name, "blogs")
2026-04-12 20:23:05 -04:00
blog_dirs = _sorted_dir_entries(blog_entries)
blog_count = len(blog_dirs)
blog_posts = await asyncio.gather(
*[
_summarize_blog_post(
client,
owner_login,
repo_name,
str(repo.get("full_name", f"{owner_login}/{repo_name}")),
str(repo.get("description") or ""),
str(repo.get("updated_at", "")),
2026-04-13 18:19:50 -04:00
default_branch,
str(repo.get("html_url", "")),
2026-04-12 20:23:05 -04:00
str(blog_dir.get("name", "")),
)
for blog_dir in blog_dirs
],
)
2026-04-08 06:03:48 -04:00
return {
"name": repo_name,
"owner": owner_login,
"full_name": repo.get("full_name", f"{owner_login}/{repo_name}"),
"html_url": repo.get("html_url", ""),
"description": repo.get("description") or "No repository description yet.",
"lesson_count": lesson_count,
"chapter_count": chapter_count,
"blog_count": blog_count,
2026-04-12 20:23:05 -04:00
"blog_posts": blog_posts,
2026-04-08 06:03:48 -04:00
"updated_at": repo.get("updated_at", ""),
"course_outline": course_outline,
}
def _course_card(summary: dict[str, Any]) -> dict[str, object]:
return {
"title": summary["name"],
"owner": summary["owner"],
"name": summary["name"],
"repo": summary["full_name"],
"html_url": summary["html_url"],
"lessons": summary["lesson_count"],
"chapters": summary["chapter_count"],
"summary": summary["description"],
"status": "Live course repo",
"outline": summary["course_outline"],
"updated_at": summary["updated_at"],
2026-04-08 06:03:48 -04:00
}
2026-04-12 20:23:05 -04:00
def _post_card(post: dict[str, Any]) -> dict[str, object]:
2026-04-08 06:03:48 -04:00
return {
2026-04-12 20:23:05 -04:00
"title": post["title"],
"owner": post["owner"],
"name": post["name"],
"repo": post["repo"],
"slug": post["slug"],
"kind": "Blog post",
"summary": post["summary"],
"path": post["path"],
"file_path": post["file_path"],
"html_url": post["html_url"],
2026-04-13 18:19:50 -04:00
"raw_base_url": post["raw_base_url"],
"assets": post["assets"],
2026-04-12 20:23:05 -04:00
"body": post["body"],
"updated_at": post["updated_at"],
2026-04-08 06:03:48 -04:00
}
async def _recent_public_issues(
client: ForgejoClient,
repos: list[dict[str, Any]],
limit: int,
) -> list[dict[str, Any]]:
issue_lists = await asyncio.gather(
*[_repo_issues(client, repo, limit) for repo in repos],
)
issues = [issue for issue_list in issue_lists for issue in issue_list]
return sorted(issues, key=lambda issue: str(issue.get("updated_at", "")), reverse=True)[:limit]
async def _repo_issues(
client: ForgejoClient,
repo: dict[str, Any],
limit: int,
) -> list[dict[str, Any]]:
owner_login = _repo_owner_login(repo)
repo_name = repo.get("name")
if not isinstance(owner_login, str) or not isinstance(repo_name, str):
return []
try:
issues = await client.list_repo_issues(owner_login, repo_name, limit=limit)
except ForgejoClientError:
return []
return [_with_repository(issue, repo, owner_login, repo_name) for issue in issues]
def _with_repository(
issue: dict[str, Any],
repo: dict[str, Any],
owner_login: str,
repo_name: str,
) -> dict[str, Any]:
issue_with_repo = dict(issue)
issue_with_repo["repository"] = {
"owner": owner_login,
"name": repo_name,
"full_name": repo.get("full_name", f"{owner_login}/{repo_name}"),
"private": False,
}
return issue_with_repo
def _repo_owner_login(repo: dict[str, Any]) -> str | None:
owner = repo.get("owner", {})
if isinstance(owner, dict) and isinstance(owner.get("login"), str):
return owner["login"]
if isinstance(owner, str):
return owner
return None
2026-04-08 06:03:48 -04:00
def _event_cards(calendar_feeds: list[CalendarFeed], limit: int) -> list[dict[str, object]]:
upcoming_events = sorted(
[event for feed in calendar_feeds for event in feed.events],
key=lambda event: event.starts_at,
)[:limit]
return [
{
"title": event.title,
"when": _format_event_datetime(event.starts_at),
"source": event.source,
"mode": event.mode,
}
for event in upcoming_events
]
async def _discussion_card(client: ForgejoClient, issue: dict[str, Any]) -> dict[str, object]:
repository = issue.get("repository") or {}
owner = repository.get("owner", "")
issue_number = int(issue.get("number", 0))
comment_items: list[dict[str, object]] = []
if isinstance(owner, str) and isinstance(repository.get("name"), str) and issue_number > 0:
try:
comment_items = [
_discussion_reply(comment)
for comment in await client.list_issue_comments(
owner,
repository["name"],
issue_number,
)
]
except ForgejoClientError:
comment_items = []
2026-04-13 18:19:50 -04:00
return discussion_card_from_issue(issue, comments=comment_items)
def discussion_card_from_issue(
issue: dict[str, Any],
*,
comments: list[dict[str, object]] | None = None,
) -> dict[str, object]:
repository = issue.get("repository") or {}
full_name = repository.get("full_name", "Unknown repo")
issue_author = issue.get("user") or {}
issue_number = int(issue.get("number", 0) or 0)
labels = [
label.get("name")
for label in issue.get("labels", [])
if isinstance(label, dict) and isinstance(label.get("name"), str)
]
2026-04-08 06:03:48 -04:00
body = str(issue.get("body", "") or "").strip()
2026-04-13 18:19:50 -04:00
links = discussion_links_from_text(body)
2026-04-08 06:03:48 -04:00
if not body:
body = "No issue description yet. Right now the conversation starts in the replies."
return {
"id": int(issue.get("id", 0)),
"title": issue.get("title", "Untitled issue"),
"repo": full_name,
2026-04-13 18:19:50 -04:00
"replies": int(issue.get("comments", 0) or 0),
"context": "Linked discussion" if links else "Live Forgejo issue",
2026-04-08 06:03:48 -04:00
"author": issue_author.get("login", "Unknown author"),
"author_avatar_url": issue_author.get("avatar_url", ""),
"state": issue.get("state", "open"),
"body": body,
"number": issue_number,
"updated_at": issue.get("updated_at", ""),
"html_url": issue.get("html_url", ""),
"labels": [label for label in labels if isinstance(label, str)],
2026-04-13 18:19:50 -04:00
"comments": comments or [],
"links": links,
2026-04-08 06:03:48 -04:00
}
2026-04-13 18:19:50 -04:00
def discussion_links_from_text(text: str) -> list[dict[str, object]]:
links: list[dict[str, object]] = []
seen: set[tuple[str, str, str, str]] = set()
for match in re.finditer(
r"(?:https?://[^\s)]+)?(/posts/([^/\s)]+)/([^/\s)]+)/([^/\s)#?]+))", text
):
owner = unquote(match.group(2))
repo = unquote(match.group(3))
slug = unquote(match.group(4).rstrip(".,"))
path = f"/posts/{owner}/{repo}/{slug}"
_append_discussion_link(
links,
seen,
{
"kind": "post",
"path": path,
"owner": owner,
"repo": repo,
"slug": slug,
"content_path": f"blogs/{slug}",
},
)
lesson_pattern = (
r"(?:https?://[^\s)]+)?"
r"(/courses/([^/\s)]+)/([^/\s)]+)/lessons/([^/\s)]+)/([^/\s)#?]+))"
)
for match in re.finditer(lesson_pattern, text):
owner = unquote(match.group(2))
repo = unquote(match.group(3))
chapter = unquote(match.group(4))
lesson = unquote(match.group(5).rstrip(".,"))
path = f"/courses/{owner}/{repo}/lessons/{chapter}/{lesson}"
_append_discussion_link(
links,
seen,
{
"kind": "lesson",
"path": path,
"owner": owner,
"repo": repo,
"chapter": chapter,
"lesson": lesson,
"content_path": f"lessons/{chapter}/{lesson}",
},
)
for raw_url in re.findall(r"https?://[^\s)]+", text):
file_link = _forgejo_file_link(raw_url)
if file_link is not None:
_append_discussion_link(links, seen, file_link)
return links
def _append_discussion_link(
links: list[dict[str, object]],
seen: set[tuple[str, str, str, str]],
link: dict[str, object],
) -> None:
key = (
str(link.get("kind", "")),
str(link.get("owner", "")),
str(link.get("repo", "")),
str(link.get("content_path", "")),
)
if key in seen:
return
seen.add(key)
links.append(link)
def _forgejo_file_link(raw_url: str) -> dict[str, object] | None:
parsed = urlparse(raw_url.rstrip(".,"))
path_parts = [unquote(part) for part in parsed.path.strip("/").split("/") if part]
if len(path_parts) < 6 or path_parts[2:4] != ["src", "branch"]:
return None
owner, repo = path_parts[0], path_parts[1]
content_parts = path_parts[5:]
if len(content_parts) < 2:
return None
if content_parts[0] == "blogs":
slug = content_parts[1]
return {
"kind": "post",
"path": f"/posts/{owner}/{repo}/{slug}",
"owner": owner,
"repo": repo,
"slug": slug,
"content_path": f"blogs/{slug}",
}
if content_parts[0] == "lessons" and len(content_parts) >= 3:
chapter = content_parts[1]
lesson = content_parts[2]
return {
"kind": "lesson",
"path": f"/courses/{owner}/{repo}/lessons/{chapter}/{lesson}",
"owner": owner,
"repo": repo,
"chapter": chapter,
"lesson": lesson,
"content_path": f"lessons/{chapter}/{lesson}",
}
return None
2026-04-08 06:03:48 -04:00
def _discussion_reply(comment: dict[str, Any]) -> dict[str, object]:
author = comment.get("user") or {}
body = str(comment.get("body", "") or "").strip()
if not body:
body = "No comment body provided."
return {
"id": int(comment.get("id", 0)),
"author": author.get("login", "Unknown author"),
"avatar_url": author.get("avatar_url", ""),
"body": body,
"created_at": comment.get("created_at", ""),
"html_url": comment.get("html_url", ""),
}
def _display_auth_source(auth_source: str, session_user: dict[str, Any] | None) -> str:
if session_user:
return "session"
return auth_source
2026-04-08 06:03:48 -04:00
def _empty_payload(
*,
source_cards: list[dict[str, str]],
warnings: list[str],
auth: dict[str, object],
2026-04-08 06:03:48 -04:00
hero_summary: str,
) -> dict[str, object]:
return {
"hero": {
"eyebrow": "Forgejo connection status",
"title": "Robot U is configured for aksal.cloud.",
"summary": hero_summary,
"highlights": [
"Forgejo remains the source of truth for content and discussions",
"The prototype now targets aksal.cloud by default",
"Public repo discovery works without signing in when Forgejo allows anonymous reads",
2026-04-08 06:03:48 -04:00
],
},
"auth": auth,
2026-04-08 06:03:48 -04:00
"source_of_truth": source_cards,
2026-04-13 18:19:50 -04:00
"discussion_settings": _discussion_settings_from_configured(False),
2026-04-08 06:03:48 -04:00
"featured_courses": [],
"recent_posts": [],
"upcoming_events": [],
"recent_discussions": [],
"implementation_notes": warnings
or ["Live repo discovery is ready, but Forgejo did not return public content."],
}
def _auth_payload(
user: dict[str, Any] | None,
source: str,
settings: Settings,
) -> dict[str, object]:
oauth_configured = bool(
2026-04-12 22:02:47 -04:00
settings.auth_secret_key
and settings.forgejo_oauth_client_id
and settings.forgejo_oauth_client_secret
)
if not user:
return {
"authenticated": False,
"login": None,
"source": source,
"can_reply": source in {"authorization", "session"},
"oauth_configured": oauth_configured,
}
return {
"authenticated": True,
"login": user.get("login", "Unknown user"),
"source": source,
"can_reply": source in {"authorization", "session"},
"oauth_configured": oauth_configured,
2026-04-08 06:03:48 -04:00
}
2026-04-12 20:23:05 -04:00
async def _summarize_blog_post(
client: ForgejoClient,
owner: str,
repo: str,
full_name: str,
repo_description: str,
updated_at: str,
2026-04-13 18:19:50 -04:00
default_branch: str,
repo_html_url: str,
2026-04-12 20:23:05 -04:00
post_name: str,
) -> dict[str, object]:
post_path = f"blogs/{post_name}"
fallback_title = _display_name(post_name)
2026-04-13 18:19:50 -04:00
raw_base_url = _raw_folder_url(repo_html_url, default_branch, post_path)
2026-04-12 20:23:05 -04:00
try:
post_entries = await client.list_directory(owner, repo, post_path)
except ForgejoClientError:
return _empty_blog_post(
owner,
repo,
full_name,
post_name,
fallback_title,
repo_description,
updated_at,
post_path,
2026-04-13 18:19:50 -04:00
raw_base_url=raw_base_url,
2026-04-12 20:23:05 -04:00
)
2026-04-13 18:19:50 -04:00
assets = _content_assets(post_entries, raw_base_url, post_path)
2026-04-12 20:23:05 -04:00
markdown_files = _markdown_file_entries(post_entries)
if not markdown_files:
return _empty_blog_post(
owner,
repo,
full_name,
post_name,
fallback_title,
repo_description,
updated_at,
post_path,
2026-04-13 18:19:50 -04:00
raw_base_url=raw_base_url,
assets=assets,
2026-04-12 20:23:05 -04:00
)
markdown_name = str(markdown_files[0]["name"])
markdown_path = f"{post_path}/{markdown_name}"
try:
file_payload = await client.get_file_content(owner, repo, markdown_path)
except ForgejoClientError:
return _empty_blog_post(
owner,
repo,
full_name,
post_name,
fallback_title,
repo_description,
updated_at,
post_path,
file_path=markdown_path,
html_url=str(markdown_files[0].get("html_url", "")),
2026-04-13 18:19:50 -04:00
raw_base_url=raw_base_url,
assets=assets,
2026-04-12 20:23:05 -04:00
)
metadata, body = _parse_frontmatter(str(file_payload.get("content", "")))
return {
"slug": post_name,
"title": str(metadata.get("title") or _display_name(markdown_name) or fallback_title),
"owner": owner,
"name": repo,
"repo": full_name,
"summary": str(metadata.get("summary") or repo_description or ""),
"path": post_path,
"file_path": str(file_payload.get("path", markdown_path)),
"html_url": str(file_payload.get("html_url", "")),
2026-04-13 18:19:50 -04:00
"raw_base_url": raw_base_url,
"assets": assets,
2026-04-12 20:23:05 -04:00
"body": body,
"updated_at": updated_at,
}
2026-04-08 06:03:48 -04:00
async def _summarize_lesson(
client: ForgejoClient,
owner: str,
repo: str,
2026-04-13 18:19:50 -04:00
default_branch: str,
repo_html_url: str,
2026-04-08 06:03:48 -04:00
chapter_name: str,
lesson_name: str,
) -> dict[str, object]:
lesson_path = f"lessons/{chapter_name}/{lesson_name}"
fallback_title = _display_name(lesson_name)
2026-04-13 18:19:50 -04:00
raw_base_url = _raw_folder_url(repo_html_url, default_branch, lesson_path)
2026-04-08 06:03:48 -04:00
try:
lesson_entries = await client.list_directory(owner, repo, lesson_path)
except ForgejoClientError:
2026-04-13 18:19:50 -04:00
return _empty_lesson(lesson_name, fallback_title, lesson_path, raw_base_url=raw_base_url)
2026-04-08 06:03:48 -04:00
2026-04-13 18:19:50 -04:00
assets = _content_assets(lesson_entries, raw_base_url, lesson_path)
2026-04-12 20:23:05 -04:00
markdown_files = _markdown_file_entries(lesson_entries)
2026-04-08 06:03:48 -04:00
if not markdown_files:
2026-04-13 18:19:50 -04:00
return _empty_lesson(
lesson_name,
fallback_title,
lesson_path,
raw_base_url=raw_base_url,
assets=assets,
)
2026-04-08 06:03:48 -04:00
markdown_name = str(markdown_files[0]["name"])
markdown_path = f"{lesson_path}/{markdown_name}"
try:
file_payload = await client.get_file_content(owner, repo, markdown_path)
except ForgejoClientError:
return _empty_lesson(
lesson_name,
fallback_title,
lesson_path,
file_path=markdown_path,
html_url=str(markdown_files[0].get("html_url", "")),
2026-04-13 18:19:50 -04:00
raw_base_url=raw_base_url,
assets=assets,
2026-04-08 06:03:48 -04:00
)
metadata, body = _parse_frontmatter(str(file_payload.get("content", "")))
return {
"slug": lesson_name,
"title": str(metadata.get("title") or _display_name(markdown_name) or fallback_title),
"summary": str(metadata.get("summary") or ""),
"path": lesson_path,
"file_path": str(file_payload.get("path", markdown_path)),
"html_url": str(file_payload.get("html_url", "")),
2026-04-13 18:19:50 -04:00
"raw_base_url": raw_base_url,
"assets": assets,
2026-04-08 06:03:48 -04:00
"body": body,
}
def _sorted_dir_entries(entries: list[dict[str, Any]]) -> list[dict[str, Any]]:
return sorted(
[
entry
for entry in entries
if entry.get("type") == "dir" and isinstance(entry.get("name"), str)
],
key=lambda entry: str(entry["name"]),
)
2026-04-12 20:23:05 -04:00
def _markdown_file_entries(entries: list[dict[str, Any]]) -> list[dict[str, Any]]:
return sorted(
[
entry
for entry in entries
if entry.get("type") == "file"
and isinstance(entry.get("name"), str)
and str(entry.get("name", "")).lower().endswith(".md")
],
key=lambda entry: str(entry["name"]),
)
2026-04-13 18:19:50 -04:00
def _content_assets(
entries: list[dict[str, Any]],
raw_base_url: str,
folder_path: str,
) -> list[dict[str, object]]:
assets: list[dict[str, object]] = []
for entry in entries:
if entry.get("type") != "file" or not isinstance(entry.get("name"), str):
continue
name = str(entry["name"])
if name.lower().endswith(".md"):
continue
path = str(entry.get("path") or f"{folder_path}/{name}")
assets.append(
{
"name": name,
"path": path,
"html_url": str(entry.get("html_url", "")),
"download_url": str(entry.get("download_url") or _raw_file_url(raw_base_url, name)),
},
)
return sorted(assets, key=lambda asset: str(asset["name"]))
def _raw_folder_url(repo_html_url: str, default_branch: str, folder_path: str) -> str:
if not repo_html_url:
return ""
branch = default_branch.strip("/") or "main"
return f"{repo_html_url.rstrip('/')}/raw/branch/{branch}/{folder_path.strip('/')}/"
def _raw_file_url(raw_base_url: str, name: str) -> str:
if not raw_base_url:
return ""
return f"{raw_base_url.rstrip('/')}/{name}"
2026-04-08 06:03:48 -04:00
def _display_name(value: str) -> str:
cleaned = value.strip().rsplit(".", 1)[0]
cleaned = cleaned.replace("_", " ").replace("-", " ")
cleaned = " ".join(cleaned.split())
cleaned = cleaned.lstrip("0123456789 ").strip()
return cleaned.title() or value
async def _load_calendar_feeds(settings: Settings, warnings: list[str]) -> list[CalendarFeed]:
if not settings.calendar_feed_urls:
return []
results = await asyncio.gather(
*[
fetch_calendar_feed(url, settings.forgejo_request_timeout_seconds)
for url in settings.calendar_feed_urls
],
return_exceptions=True,
)
feeds: list[CalendarFeed] = []
for url, result in zip(settings.calendar_feed_urls, results, strict=False):
if isinstance(result, CalendarFeed):
feeds.append(result)
continue
if isinstance(result, CalendarFeedError):
warnings.append(str(result))
continue
if isinstance(result, Exception):
warnings.append(f"Calendar feed failed for {url}: {result}")
return feeds
def _format_event_datetime(value: Any) -> str:
timezone_name = value.strftime("%Z") if hasattr(value, "strftime") else ""
suffix = f" {timezone_name}" if timezone_name else ""
return f"{value.strftime('%b %-d, %-I:%M %p')}{suffix}"
2026-04-08 06:03:48 -04:00
def _empty_lesson(
lesson_name: str,
title: str,
lesson_path: str,
*,
file_path: str = "",
html_url: str = "",
2026-04-13 18:19:50 -04:00
raw_base_url: str = "",
assets: list[dict[str, object]] | None = None,
2026-04-08 06:03:48 -04:00
) -> dict[str, object]:
return {
"slug": lesson_name,
"title": title,
"summary": "",
"path": lesson_path,
"file_path": file_path,
"html_url": html_url,
2026-04-13 18:19:50 -04:00
"raw_base_url": raw_base_url,
"assets": assets or [],
2026-04-08 06:03:48 -04:00
"body": "",
}
2026-04-12 20:23:05 -04:00
def _empty_blog_post(
owner: str,
repo: str,
full_name: str,
post_name: str,
title: str,
summary: str,
updated_at: str,
post_path: str,
*,
file_path: str = "",
html_url: str = "",
2026-04-13 18:19:50 -04:00
raw_base_url: str = "",
assets: list[dict[str, object]] | None = None,
2026-04-12 20:23:05 -04:00
) -> dict[str, object]:
return {
"slug": post_name,
"title": title,
"owner": owner,
"name": repo,
"repo": full_name,
"summary": summary,
"path": post_path,
"file_path": file_path,
"html_url": html_url,
2026-04-13 18:19:50 -04:00
"raw_base_url": raw_base_url,
"assets": assets or [],
2026-04-12 20:23:05 -04:00
"body": "",
"updated_at": updated_at,
}
2026-04-08 06:03:48 -04:00
def _parse_frontmatter(markdown: str) -> tuple[dict[str, str], str]:
if not markdown.startswith("---\n"):
return {}, markdown.strip()
lines = markdown.splitlines()
if not lines or lines[0].strip() != "---":
return {}, markdown.strip()
metadata: dict[str, str] = {}
for index, line in enumerate(lines[1:], start=1):
if line.strip() == "---":
body = "\n".join(lines[index + 1 :]).strip()
return metadata, body
if ":" not in line:
continue
key, raw_value = line.split(":", 1)
key = key.strip()
value = raw_value.strip().strip("\"'")
if key and value:
metadata[key] = value
return {}, markdown.strip()