robot-u-site/app.py

569 lines
21 KiB
Python
Raw Permalink Normal View History

2026-04-08 06:03:48 -04:00
from __future__ import annotations
2026-04-13 18:19:50 -04:00
import hmac
from hashlib import sha256
2026-04-08 06:03:48 -04:00
from pathlib import Path
from typing import Any
from urllib.parse import urlencode
2026-04-08 06:03:48 -04:00
from fastapi import Body, FastAPI, HTTPException, Request
2026-04-08 06:03:48 -04:00
from fastapi.middleware.cors import CORSMiddleware
2026-04-13 18:19:50 -04:00
from fastapi.responses import FileResponse, JSONResponse, RedirectResponse, StreamingResponse
2026-04-08 06:03:48 -04:00
from fastapi.staticfiles import StaticFiles
from auth import (
OAuthStateRecord,
clear_login_session,
consume_oauth_state,
create_login_session,
create_oauth_state,
current_session_user,
2026-04-14 20:17:29 -04:00
resolve_forgejo_auth,
)
from forgejo_client import ForgejoClient, ForgejoClientError
2026-04-14 20:17:29 -04:00
from live_prototype import (
DISCUSSION_LABEL_NAME,
discussion_card_from_issue,
issue_has_discussion_label,
)
2026-04-13 18:19:50 -04:00
from prototype_cache import PrototypePayloadCache
from settings import Settings, get_settings
2026-04-13 18:19:50 -04:00
from update_events import UpdateBroker, stream_sse_events
2026-04-08 06:03:48 -04:00
BASE_DIR = Path(__file__).resolve().parent
DIST_DIR = BASE_DIR / "frontend" / "dist"
def create_app() -> FastAPI:
2026-04-14 20:17:29 -04:00
settings = get_settings()
app = FastAPI(title="Robot U Community Site")
2026-04-13 18:19:50 -04:00
prototype_cache = PrototypePayloadCache()
update_broker = UpdateBroker()
2026-04-08 06:03:48 -04:00
app.add_middleware(
CORSMiddleware,
2026-04-14 20:17:29 -04:00
allow_origins=list(settings.cors_allow_origins),
allow_methods=["GET", "POST", "DELETE", "OPTIONS"],
allow_headers=[
"Authorization",
"Content-Type",
"X-Forgejo-Signature",
"X-Gitea-Signature",
"X-Hub-Signature-256",
],
2026-04-08 06:03:48 -04:00
)
@app.get("/health")
async def health() -> JSONResponse:
return JSONResponse({"status": "ok"})
@app.get("/api/prototype")
async def prototype(request: Request) -> JSONResponse:
settings = get_settings()
2026-04-12 22:02:47 -04:00
session_user = current_session_user(request, settings)
2026-04-14 20:17:29 -04:00
forgejo_token, auth_source, auth_scheme = resolve_forgejo_auth(request, settings)
2026-04-13 18:19:50 -04:00
payload = await prototype_cache.get(settings)
payload["auth"] = await _auth_payload_for_request(
settings,
forgejo_token=forgejo_token,
auth_source=auth_source,
2026-04-14 20:17:29 -04:00
auth_scheme=auth_scheme,
2026-04-13 18:19:50 -04:00
session_user=session_user,
)
2026-04-13 18:19:50 -04:00
return JSONResponse(payload)
@app.get("/api/auth/session")
async def auth_session(request: Request) -> JSONResponse:
2026-04-12 22:02:47 -04:00
settings = get_settings()
session_user = current_session_user(request, settings)
if session_user:
return JSONResponse(_auth_payload(session_user, "session"))
2026-04-14 20:17:29 -04:00
forgejo_token, auth_source, auth_scheme = resolve_forgejo_auth(request, settings)
if not forgejo_token or auth_source == "server":
return JSONResponse(_auth_payload(None, "none"))
2026-04-14 20:17:29 -04:00
async with ForgejoClient(
settings, forgejo_token=forgejo_token, auth_scheme=auth_scheme
) as client:
try:
user = await client.fetch_current_user()
except ForgejoClientError as error:
raise HTTPException(status_code=401, detail=str(error)) from error
return JSONResponse(_auth_payload(user, auth_source))
2026-04-13 18:19:50 -04:00
@app.get("/api/events/stream")
async def events_stream() -> StreamingResponse:
return StreamingResponse(
stream_sse_events(update_broker),
media_type="text/event-stream",
headers={
"Cache-Control": "no-store",
"X-Accel-Buffering": "no",
},
)
@app.post("/api/forgejo/webhook")
async def forgejo_webhook(request: Request) -> JSONResponse:
settings = get_settings()
body = await request.body()
if not _valid_webhook_signature(request, settings, body):
raise HTTPException(status_code=401, detail="Invalid Forgejo webhook signature.")
prototype_cache.invalidate()
await update_broker.publish("content-updated", {"reason": "forgejo-webhook"})
return JSONResponse({"status": "accepted"})
@app.get("/api/auth/forgejo/start")
async def forgejo_auth_start(request: Request, return_to: str = "/") -> RedirectResponse:
settings = get_settings()
if not _oauth_configured(settings):
return _signin_error_redirect("Forgejo OAuth is not configured on the site yet.")
redirect_uri = _oauth_redirect_uri(request, settings)
state, code_challenge = create_oauth_state(redirect_uri, return_to)
async with ForgejoClient(settings) as client:
try:
oidc = await client.fetch_openid_configuration()
except ForgejoClientError as error:
return _signin_error_redirect(str(error))
authorization_endpoint = str(oidc.get("authorization_endpoint") or "")
if not authorization_endpoint:
return _signin_error_redirect("Forgejo did not return an OAuth authorization endpoint.")
query = urlencode(
{
"client_id": settings.forgejo_oauth_client_id or "",
"redirect_uri": redirect_uri,
"response_type": "code",
"scope": " ".join(settings.forgejo_oauth_scopes),
"state": state,
"code_challenge": code_challenge,
"code_challenge_method": "S256",
},
)
return RedirectResponse(f"{authorization_endpoint}?{query}", status_code=303)
@app.get("/api/auth/forgejo/callback")
async def forgejo_auth_callback(
code: str | None = None,
state: str | None = None,
error: str | None = None,
) -> RedirectResponse:
if error:
return _signin_error_redirect(f"Forgejo sign-in failed: {error}")
if not code or not state:
return _signin_error_redirect("Forgejo did not return the expected sign-in data.")
settings = get_settings()
if not _oauth_configured(settings):
return _signin_error_redirect("Forgejo OAuth is not configured on the site yet.")
oauth_state = consume_oauth_state(state)
if oauth_state is None:
return _signin_error_redirect("The Forgejo sign-in request expired. Try again.")
try:
access_token = await _exchange_forgejo_code(settings, code, oauth_state)
user = await _fetch_forgejo_oidc_user(settings, access_token)
except ForgejoClientError as exchange_error:
return _signin_error_redirect(str(exchange_error))
response = RedirectResponse(oauth_state.return_to, status_code=303)
2026-04-12 22:02:47 -04:00
create_login_session(response, settings, access_token, user)
return response
@app.delete("/api/auth/session")
async def delete_auth_session(request: Request) -> JSONResponse:
response = JSONResponse(_auth_payload(None, "none"))
clear_login_session(request, response)
return response
2026-04-13 18:19:50 -04:00
@app.get("/api/discussions/{owner}/{repo}/{issue_number}")
async def discussion_detail(owner: str, repo: str, issue_number: int) -> JSONResponse:
if issue_number < 1:
raise HTTPException(status_code=400, detail="issue_number must be positive.")
settings = get_settings()
async with ForgejoClient(settings, forgejo_token=settings.forgejo_token) as client:
try:
repo_payload = await client.fetch_repository(owner, repo)
if repo_payload.get("private"):
raise HTTPException(
status_code=403,
detail="This site only reads public Forgejo repositories.",
)
issue = await client.fetch_issue(owner, repo, issue_number)
2026-04-14 20:17:29 -04:00
if not issue_has_discussion_label(issue):
raise HTTPException(
status_code=404,
detail="This issue is not labeled as a discussion.",
)
2026-04-13 18:19:50 -04:00
comments = [
_discussion_reply(comment)
for comment in await client.list_issue_comments(owner, repo, issue_number)
]
except ForgejoClientError as error:
raise HTTPException(status_code=502, detail=str(error)) from error
issue["repository"] = _issue_repository_payload(repo_payload, owner, repo)
return JSONResponse(discussion_card_from_issue(issue, comments=comments))
@app.post("/api/discussions/replies")
async def create_discussion_reply(
request: Request,
payload: dict[str, object] = Body(...),
) -> JSONResponse:
owner = _required_string(payload, "owner")
repo = _required_string(payload, "repo")
body = _required_string(payload, "body")
issue_number = _required_positive_int(payload, "number")
settings = get_settings()
2026-04-14 20:17:29 -04:00
forgejo_token, auth_source, auth_scheme = resolve_forgejo_auth(request, settings)
if not forgejo_token or auth_source == "server":
raise HTTPException(
status_code=401,
detail="Sign in or send an Authorization token before replying.",
)
2026-04-14 20:17:29 -04:00
async with ForgejoClient(
settings, forgejo_token=forgejo_token, auth_scheme=auth_scheme
) as client:
try:
repo_payload = await client.fetch_repository(owner, repo)
if repo_payload.get("private"):
raise HTTPException(
status_code=403,
detail="This site only writes to public Forgejo repositories.",
)
comment = await client.create_issue_comment(owner, repo, issue_number, body)
except ForgejoClientError as error:
raise HTTPException(status_code=502, detail=str(error)) from error
2026-04-13 18:19:50 -04:00
prototype_cache.invalidate()
await update_broker.publish(
"content-updated",
{"reason": "discussion-reply", "repo": f"{owner}/{repo}", "number": issue_number},
)
return JSONResponse(_discussion_reply(comment))
2026-04-08 06:03:48 -04:00
2026-04-13 18:19:50 -04:00
@app.post("/api/discussions")
async def create_discussion(
request: Request,
payload: dict[str, object] = Body(...),
) -> JSONResponse:
title = _required_string(payload, "title")
body = _required_string(payload, "body")
owner, repo = _discussion_target(payload, get_settings())
context_url = _optional_string(payload, "context_url")
context_title = _optional_string(payload, "context_title")
context_path = _optional_string(payload, "context_path")
issue_body = _discussion_issue_body(
body,
context_url=context_url,
context_title=context_title,
context_path=context_path,
)
settings = get_settings()
2026-04-14 20:17:29 -04:00
forgejo_token, auth_source, auth_scheme = resolve_forgejo_auth(request, settings)
2026-04-13 18:19:50 -04:00
if not forgejo_token or auth_source == "server":
raise HTTPException(
status_code=401,
detail="Sign in or send an Authorization token before starting a discussion.",
)
2026-04-14 20:17:29 -04:00
async with ForgejoClient(
settings, forgejo_token=forgejo_token, auth_scheme=auth_scheme
) as client:
2026-04-13 18:19:50 -04:00
try:
repo_payload = await client.fetch_repository(owner, repo)
if repo_payload.get("private"):
raise HTTPException(
status_code=403,
detail="This site only writes to public Forgejo repositories.",
)
2026-04-14 20:17:29 -04:00
discussion_label_id = await client.ensure_repo_label(
owner,
repo,
DISCUSSION_LABEL_NAME,
color="#0f6f8f",
description="Shown on Robot U as a community discussion.",
)
issue = await client.create_issue(
owner,
repo,
title,
issue_body,
label_ids=[discussion_label_id],
)
2026-04-13 18:19:50 -04:00
except ForgejoClientError as error:
raise HTTPException(status_code=502, detail=str(error)) from error
issue["repository"] = _issue_repository_payload(repo_payload, owner, repo)
2026-04-14 20:17:29 -04:00
if not issue_has_discussion_label(issue):
issue["labels"] = [
*[label for label in issue.get("labels", []) if isinstance(label, dict)],
{"id": discussion_label_id, "name": DISCUSSION_LABEL_NAME},
]
2026-04-13 18:19:50 -04:00
prototype_cache.invalidate()
await update_broker.publish(
"content-updated",
{"reason": "discussion-created", "repo": f"{owner}/{repo}"},
)
return JSONResponse(discussion_card_from_issue(issue, comments=[]))
2026-04-08 06:03:48 -04:00
if DIST_DIR.exists():
assets_dir = DIST_DIR / "assets"
if assets_dir.exists():
app.mount("/assets", StaticFiles(directory=str(assets_dir)), name="assets")
@app.get("/{full_path:path}")
async def spa_fallback(full_path: str) -> FileResponse:
candidate = DIST_DIR / full_path
if candidate.is_file():
return FileResponse(str(candidate))
response = FileResponse(str(DIST_DIR / "index.html"))
response.headers["Cache-Control"] = "no-store, no-cache, must-revalidate"
response.headers["Pragma"] = "no-cache"
response.headers["Expires"] = "0"
return response
return app
app = create_app()
def _required_string(payload: dict[str, object], key: str) -> str:
value = payload.get(key)
if not isinstance(value, str) or not value.strip():
raise HTTPException(status_code=400, detail=f"{key} is required.")
return value.strip()
2026-04-13 18:19:50 -04:00
def _optional_string(payload: dict[str, object], key: str) -> str | None:
value = payload.get(key)
if value is None:
return None
if not isinstance(value, str):
raise HTTPException(status_code=400, detail=f"{key} must be a string.")
stripped = value.strip()
return stripped or None
def _required_positive_int(payload: dict[str, object], key: str) -> int:
value = payload.get(key)
if isinstance(value, bool):
raise HTTPException(status_code=400, detail=f"{key} must be a positive integer.")
if isinstance(value, int):
parsed = value
elif isinstance(value, str) and value.isdigit():
parsed = int(value)
else:
raise HTTPException(status_code=400, detail=f"{key} must be a positive integer.")
if parsed < 1:
raise HTTPException(status_code=400, detail=f"{key} must be a positive integer.")
return parsed
2026-04-13 18:19:50 -04:00
def _discussion_target(payload: dict[str, object], settings: Settings) -> tuple[str, str]:
owner = _optional_string(payload, "owner")
repo = _optional_string(payload, "repo")
if owner and repo:
return owner, repo
if owner or repo:
raise HTTPException(status_code=400, detail="owner and repo must be provided together.")
configured_repo = settings.forgejo_general_discussion_repo
if configured_repo:
parts = [part for part in configured_repo.strip().split("/", 1) if part]
if len(parts) == 2:
return parts[0], parts[1]
raise HTTPException(
status_code=400,
detail="General discussion repo is not configured.",
)
def _discussion_issue_body(
body: str,
*,
context_url: str | None,
context_title: str | None,
context_path: str | None,
) -> str:
related_lines: list[str] = []
if context_title:
related_lines.append(f"Related content: {context_title}")
if context_url:
related_lines.append(f"Canonical URL: {context_url}")
if context_path:
related_lines.append(f"Source path: {context_path}")
if not related_lines:
return body
return f"{body}\n\n---\n" + "\n".join(related_lines)
def _issue_repository_payload(
repo_payload: dict[str, Any],
owner: str,
repo: str,
) -> dict[str, object]:
repo_owner = repo_payload.get("owner") or {}
owner_login = repo_owner.get("login") if isinstance(repo_owner, dict) else owner
return {
"owner": owner_login or owner,
"name": repo_payload.get("name") or repo,
"full_name": repo_payload.get("full_name") or f"{owner}/{repo}",
"private": False,
}
def _discussion_reply(comment: dict[str, Any]) -> dict[str, object]:
author = comment.get("user") or {}
body = str(comment.get("body", "") or "").strip()
if not body:
body = "No comment body provided."
return {
"id": int(comment.get("id", 0)),
"author": author.get("login", "Unknown author"),
"avatar_url": author.get("avatar_url", ""),
"body": body,
"created_at": comment.get("created_at", ""),
"html_url": comment.get("html_url", ""),
}
def _auth_payload(user: dict[str, Any] | None, source: str) -> dict[str, object]:
oauth_configured = _oauth_configured(get_settings())
if not user:
return {
"authenticated": False,
"login": None,
"source": source,
"can_reply": source in {"authorization", "session"},
"oauth_configured": oauth_configured,
}
return {
"authenticated": True,
"login": user.get("login", "Unknown user"),
"source": source,
"can_reply": source in {"authorization", "session"},
"oauth_configured": oauth_configured,
}
2026-04-13 18:19:50 -04:00
async def _auth_payload_for_request(
settings: Settings,
*,
forgejo_token: str | None,
auth_source: str,
2026-04-14 20:17:29 -04:00
auth_scheme: str,
2026-04-13 18:19:50 -04:00
session_user: dict[str, Any] | None,
) -> dict[str, object]:
if session_user:
return _auth_payload(session_user, "session")
if not forgejo_token or auth_source == "server":
return _auth_payload(None, "none")
2026-04-14 20:17:29 -04:00
async with ForgejoClient(
settings, forgejo_token=forgejo_token, auth_scheme=auth_scheme
) as client:
2026-04-13 18:19:50 -04:00
try:
user = await client.fetch_current_user()
except ForgejoClientError as error:
raise HTTPException(status_code=401, detail=str(error)) from error
return _auth_payload(user, auth_source)
def _valid_webhook_signature(request: Request, settings: Settings, body: bytes) -> bool:
secret = settings.forgejo_webhook_secret
if not secret:
return True
expected = hmac.new(secret.encode("utf-8"), body, sha256).hexdigest()
candidates = [
request.headers.get("x-forgejo-signature", ""),
request.headers.get("x-gitea-signature", ""),
request.headers.get("x-hub-signature-256", "").removeprefix("sha256="),
]
return any(hmac.compare_digest(expected, candidate.strip()) for candidate in candidates)
def _oauth_configured(settings: Settings) -> bool:
2026-04-12 22:02:47 -04:00
return bool(
settings.auth_secret_key
and settings.forgejo_oauth_client_id
and settings.forgejo_oauth_client_secret
)
def _oauth_redirect_uri(request: Request, settings: Settings) -> str:
if settings.app_base_url:
return f"{settings.app_base_url}/api/auth/forgejo/callback"
return str(request.url_for("forgejo_auth_callback"))
def _signin_error_redirect(message: str) -> RedirectResponse:
return RedirectResponse(f"/signin?{urlencode({'error': message})}", status_code=303)
async def _exchange_forgejo_code(
settings: Settings,
code: str,
oauth_state: OAuthStateRecord,
) -> str:
async with ForgejoClient(settings) as client:
oidc = await client.fetch_openid_configuration()
token_endpoint = str(oidc.get("token_endpoint") or "")
if not token_endpoint:
raise ForgejoClientError("Forgejo did not return an OAuth token endpoint.")
token_payload = await client.exchange_oauth_code(
token_endpoint=token_endpoint,
client_id=settings.forgejo_oauth_client_id or "",
client_secret=settings.forgejo_oauth_client_secret or "",
code=code,
redirect_uri=oauth_state.redirect_uri,
code_verifier=oauth_state.code_verifier,
)
access_token = token_payload.get("access_token")
if not isinstance(access_token, str) or not access_token:
raise ForgejoClientError("Forgejo did not return an access token.")
return access_token
async def _fetch_forgejo_oidc_user(settings: Settings, access_token: str) -> dict[str, Any]:
async with ForgejoClient(settings) as client:
oidc = await client.fetch_openid_configuration()
userinfo_endpoint = str(oidc.get("userinfo_endpoint") or "")
if not userinfo_endpoint:
raise ForgejoClientError("Forgejo did not return an OIDC UserInfo endpoint.")
userinfo = await client.fetch_userinfo(userinfo_endpoint, access_token)
login = userinfo.get("preferred_username") or userinfo.get("name") or userinfo.get("sub")
if not isinstance(login, str) or not login:
raise ForgejoClientError("Forgejo did not return a usable user identity.")
return {
"login": login,
"avatar_url": userinfo.get("picture", ""),
"email": userinfo.get("email", ""),
}