From d84a885fdb5eb739e3c5bf1873e7a221541dce1d Mon Sep 17 00:00:00 2001 From: kacper Date: Sun, 12 Apr 2026 22:02:47 -0400 Subject: [PATCH] Use encrypted cookie sessions --- .env.example | 2 ++ AGENTS.md | 4 ++- README.md | 7 ++-- app.py | 14 +++++--- auth.py | 91 +++++++++++++++++++++++++++++++++++++---------- live_prototype.py | 4 ++- requirements.txt | 1 + settings.py | 11 ++++++ tests/test_app.py | 24 +++++++++++++ 9 files changed, 131 insertions(+), 27 deletions(-) diff --git a/.env.example b/.env.example index 73071e3..79c34f9 100644 --- a/.env.example +++ b/.env.example @@ -1,4 +1,6 @@ APP_BASE_URL=http://kacper-dev-pod:8800 +AUTH_SECRET_KEY=replace-with-a-random-32-byte-or-longer-secret +AUTH_COOKIE_SECURE=false FORGEJO_BASE_URL=https://aksal.cloud FORGEJO_TOKEN= FORGEJO_OAUTH_CLIENT_ID= diff --git a/AGENTS.md b/AGENTS.md index 2e9d085..f4dbff8 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -63,6 +63,8 @@ Useful variables: - `FORGEJO_BASE_URL=https://aksal.cloud` - `APP_BASE_URL=http://kacper-dev-pod:8800` +- `AUTH_SECRET_KEY=...` +- `AUTH_COOKIE_SECURE=false` - `FORGEJO_OAUTH_CLIENT_ID=...` - `FORGEJO_OAUTH_CLIENT_SECRET=...` - `FORGEJO_OAUTH_SCOPES=openid profile` @@ -74,7 +76,7 @@ Useful variables: Notes: - Browser sign-in uses Forgejo OAuth/OIDC. `APP_BASE_URL` must match the URL opened in the browser, and the Forgejo OAuth app must include `/api/auth/forgejo/callback` under that base URL. -- Browser OAuth requests only identity scopes. The backend stores the resulting Forgejo token server-side and may use it only after enforcing public-repository checks. +- Browser OAuth requests only identity scopes. The backend stores the resulting Forgejo token in an encrypted `HttpOnly` cookie and may use it only after enforcing public-repository checks. - `FORGEJO_TOKEN` is optional and should be treated as a read-only local fallback. Browser sessions and API token calls may write comments only after verifying the target repo is public. - API clients can query with `Authorization: token ...` or `Authorization: Bearer ...`. - `CALENDAR_FEED_URLS` is optional and accepts comma-separated `webcal://` or `https://` ICS feeds. diff --git a/README.md b/README.md index 25cec52..8d05bf9 100644 --- a/README.md +++ b/README.md @@ -39,6 +39,7 @@ Optional live Forgejo configuration: ```bash export APP_BASE_URL="http://kacper-dev-pod:8800" +export AUTH_SECRET_KEY="$(openssl rand -hex 32)" export FORGEJO_BASE_URL="https://aksal.cloud" export FORGEJO_OAUTH_CLIENT_ID="your-forgejo-oauth-client-id" export FORGEJO_OAUTH_CLIENT_SECRET="your-forgejo-oauth-client-secret" @@ -52,7 +53,9 @@ export CALENDAR_FEED_URLS="webcal://example.com/calendar.ics,https://example.com http://kacper-dev-pod:8800/api/auth/forgejo/callback ``` -`FORGEJO_TOKEN` is optional. When set, it is a read fallback for local development. Browser OAuth requests only identity scopes, then the backend uses the signed-in user's Forgejo identity for public repo reads and public issue replies. The backend must verify repositories are public before reading discussion data or writing comments. +`AUTH_SECRET_KEY` is required for Forgejo OAuth sign-in. It encrypts the `HttpOnly` browser session cookie that carries the signed-in user's Forgejo token and identity. Set `AUTH_COOKIE_SECURE=true` when serving over HTTPS. + +`FORGEJO_TOKEN` is optional. When set, it is a read fallback for local development. Browser OAuth requests only identity scopes, then the backend uses the signed-in user's Forgejo identity from the encrypted session cookie for public repo reads and public issue replies. The backend must verify repositories are public before reading discussion data or writing comments. Or put those values in `.env`: @@ -60,7 +63,7 @@ Or put those values in `.env`: cp .env.example .env ``` -Sign in through `/signin` using Forgejo OAuth, or query the API directly with: +Use the site `Sign in` button for Forgejo OAuth, or query the API directly with: ```bash curl -H "Authorization: token your-forgejo-api-token" http://127.0.0.1:8800/api/prototype diff --git a/app.py b/app.py index 6d83e64..9e24488 100644 --- a/app.py +++ b/app.py @@ -43,7 +43,7 @@ def create_app() -> FastAPI: @app.get("/api/prototype") async def prototype(request: Request) -> JSONResponse: settings = get_settings() - session_user = current_session_user(request) + session_user = current_session_user(request, settings) forgejo_token, auth_source = resolve_forgejo_token(request, settings) return JSONResponse( await build_live_prototype_payload( @@ -56,11 +56,11 @@ def create_app() -> FastAPI: @app.get("/api/auth/session") async def auth_session(request: Request) -> JSONResponse: - session_user = current_session_user(request) + settings = get_settings() + session_user = current_session_user(request, settings) if session_user: return JSONResponse(_auth_payload(session_user, "session")) - settings = get_settings() forgejo_token, auth_source = resolve_forgejo_token(request, settings) if not forgejo_token or auth_source == "server": return JSONResponse(_auth_payload(None, "none")) @@ -131,7 +131,7 @@ def create_app() -> FastAPI: return _signin_error_redirect(str(exchange_error)) response = RedirectResponse(oauth_state.return_to, status_code=303) - create_login_session(response, access_token, user) + create_login_session(response, settings, access_token, user) return response @app.delete("/api/auth/session") @@ -255,7 +255,11 @@ def _auth_payload(user: dict[str, Any] | None, source: str) -> dict[str, object] def _oauth_configured(settings: Settings) -> bool: - return bool(settings.forgejo_oauth_client_id and settings.forgejo_oauth_client_secret) + return bool( + settings.auth_secret_key + and settings.forgejo_oauth_client_id + and settings.forgejo_oauth_client_secret + ) def _oauth_redirect_uri(request: Request, settings: Settings) -> str: diff --git a/auth.py b/auth.py index ddf0be5..2616a00 100644 --- a/auth.py +++ b/auth.py @@ -1,5 +1,6 @@ from __future__ import annotations +import json import secrets import time from base64 import urlsafe_b64encode @@ -7,6 +8,7 @@ from dataclasses import dataclass from hashlib import sha256 from typing import Any +from cryptography.fernet import Fernet, InvalidToken from fastapi import Request, Response from settings import Settings @@ -21,6 +23,7 @@ class SessionRecord: forgejo_token: str | None user: dict[str, Any] created_at: float + expires_at: float @dataclass @@ -31,7 +34,6 @@ class OAuthStateRecord: created_at: float -_SESSIONS: dict[str, SessionRecord] = {} _OAUTH_STATES: dict[str, OAuthStateRecord] = {} @@ -40,7 +42,7 @@ def resolve_forgejo_token(request: Request, settings: Settings) -> tuple[str | N if header_token: return header_token, "authorization" - session = _session_from_request(request) + session = _session_from_request(request, settings) if session and session.forgejo_token: return session.forgejo_token, "session" @@ -50,26 +52,30 @@ def resolve_forgejo_token(request: Request, settings: Settings) -> tuple[str | N return None, "none" -def current_session_user(request: Request) -> dict[str, Any] | None: - session = _session_from_request(request) +def current_session_user(request: Request, settings: Settings) -> dict[str, Any] | None: + session = _session_from_request(request, settings) return session.user if session else None def create_login_session( response: Response, + settings: Settings, forgejo_token: str | None, user: dict[str, Any], ) -> None: - session_id = secrets.token_urlsafe(32) - _SESSIONS[session_id] = SessionRecord( + created_at = time.time() + session = SessionRecord( forgejo_token=forgejo_token, user=user, - created_at=time.time(), + created_at=created_at, + expires_at=created_at + SESSION_MAX_AGE_SECONDS, ) + encrypted_session = _encrypt_session(session, settings) response.set_cookie( SESSION_COOKIE_NAME, - session_id, + encrypted_session, httponly=True, + secure=settings.auth_cookie_secure, samesite="lax", max_age=SESSION_MAX_AGE_SECONDS, path="/", @@ -77,9 +83,6 @@ def create_login_session( def clear_login_session(request: Request, response: Response) -> None: - session_id = request.cookies.get(SESSION_COOKIE_NAME) - if session_id: - _SESSIONS.pop(session_id, None) response.delete_cookie(SESSION_COOKIE_NAME, path="/") @@ -126,17 +129,69 @@ def _safe_return_path(value: str) -> str: return value -def _session_from_request(request: Request) -> SessionRecord | None: - session_id = request.cookies.get(SESSION_COOKIE_NAME) - if not session_id: +def _session_from_request(request: Request, settings: Settings) -> SessionRecord | None: + encrypted_session = request.cookies.get(SESSION_COOKIE_NAME) + if not encrypted_session or not settings.auth_secret_key: return None - session = _SESSIONS.get(session_id) - if not session: + try: + payload = _session_cipher(settings).decrypt( + encrypted_session.encode("utf-8"), + ttl=SESSION_MAX_AGE_SECONDS, + ) + raw_session = json.loads(payload.decode("utf-8")) + except (InvalidToken, json.JSONDecodeError, UnicodeDecodeError, ValueError): return None - if time.time() - session.created_at > SESSION_MAX_AGE_SECONDS: - _SESSIONS.pop(session_id, None) + session = _session_from_payload(raw_session) + if session is None or session.expires_at <= time.time(): return None return session + + +def _encrypt_session(session: SessionRecord, settings: Settings) -> str: + payload = { + "forgejo_token": session.forgejo_token, + "user": session.user, + "created_at": session.created_at, + "expires_at": session.expires_at, + } + return ( + _session_cipher(settings) + .encrypt( + json.dumps(payload, separators=(",", ":")).encode("utf-8"), + ) + .decode("utf-8") + ) + + +def _session_from_payload(payload: object) -> SessionRecord | None: + if not isinstance(payload, dict): + return None + + forgejo_token = payload.get("forgejo_token") + user = payload.get("user") + created_at = payload.get("created_at") + expires_at = payload.get("expires_at") + if forgejo_token is not None and not isinstance(forgejo_token, str): + return None + if not isinstance(user, dict): + return None + if not isinstance(created_at, (int, float)) or not isinstance(expires_at, (int, float)): + return None + + return SessionRecord( + forgejo_token=forgejo_token, + user=user, + created_at=float(created_at), + expires_at=float(expires_at), + ) + + +def _session_cipher(settings: Settings) -> Fernet: + if not settings.auth_secret_key: + raise ValueError("AUTH_SECRET_KEY is required for encrypted login sessions.") + + key = urlsafe_b64encode(sha256(settings.auth_secret_key.encode("utf-8")).digest()) + return Fernet(key) diff --git a/live_prototype.py b/live_prototype.py index d52658b..b1ec223 100644 --- a/live_prototype.py +++ b/live_prototype.py @@ -481,7 +481,9 @@ def _auth_payload( settings: Settings, ) -> dict[str, object]: oauth_configured = bool( - settings.forgejo_oauth_client_id and settings.forgejo_oauth_client_secret + settings.auth_secret_key + and settings.forgejo_oauth_client_id + and settings.forgejo_oauth_client_secret ) if not user: return { diff --git a/requirements.txt b/requirements.txt index 577b6e0..e69db25 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ fastapi>=0.116.0,<1.0.0 uvicorn[standard]>=0.35.0,<1.0.0 httpx>=0.28.0,<1.0.0 +cryptography>=42.0.0,<47.0.0 diff --git a/settings.py b/settings.py index 0076f15..a2686ad 100644 --- a/settings.py +++ b/settings.py @@ -8,6 +8,8 @@ from functools import lru_cache @dataclass(frozen=True) class Settings: app_base_url: str | None + auth_secret_key: str | None + auth_cookie_secure: bool forgejo_base_url: str forgejo_token: str | None forgejo_oauth_client_id: str | None @@ -44,12 +46,21 @@ def _parse_scopes(raw_value: str | None) -> tuple[str, ...]: return tuple(scope for scope in value.replace(",", " ").split() if scope) +def _parse_bool(raw_value: str | None, *, default: bool = False) -> bool: + value = (raw_value or "").strip().lower() + if not value: + return default + return value in {"1", "true", "yes", "on"} + + @lru_cache(maxsize=1) def get_settings() -> Settings: return Settings( app_base_url=_normalize_base_url(os.getenv("APP_BASE_URL")) if os.getenv("APP_BASE_URL") else None, + auth_secret_key=os.getenv("AUTH_SECRET_KEY") or None, + auth_cookie_secure=_parse_bool(os.getenv("AUTH_COOKIE_SECURE")), forgejo_base_url=_normalize_base_url(os.getenv("FORGEJO_BASE_URL")), forgejo_token=os.getenv("FORGEJO_TOKEN") or None, forgejo_oauth_client_id=os.getenv("FORGEJO_OAUTH_CLIENT_ID") or None, diff --git a/tests/test_app.py b/tests/test_app.py index ea62778..833b114 100644 --- a/tests/test_app.py +++ b/tests/test_app.py @@ -17,6 +17,7 @@ class AppTestCase(unittest.TestCase): os.environ, { "APP_BASE_URL": "http://testserver", + "AUTH_SECRET_KEY": "test-auth-secret-key-that-is-long-enough", "FORGEJO_TOKEN": "", "FORGEJO_OAUTH_CLIENT_ID": "client-id", "FORGEJO_OAUTH_CLIENT_SECRET": "client-secret", @@ -172,6 +173,7 @@ class AppTestCase(unittest.TestCase): self.assertEqual(callback_response.status_code, 303) self.assertEqual(callback_response.headers["location"], "/discussions/7") self.assertIn("robot_u_session", callback_response.cookies) + self.assertNotIn("oauth-token", callback_response.cookies["robot_u_session"]) self.assertEqual(fake_client.exchanged_code, "auth-code") session_response = self.client.get("/api/auth/session") @@ -219,6 +221,28 @@ class AppTestCase(unittest.TestCase): self.assertEqual(builder.call_args.kwargs["auth_source"], "session") self.assertEqual(builder.call_args.kwargs["session_user"]["login"], "kacper") + def test_encrypted_session_cookie_survives_new_app_instance(self) -> None: + fake_client = _FakeForgejoClient(user={"login": "kacper"}, access_token="oauth-token") + with patch("app.ForgejoClient", return_value=fake_client): + start_response = self.client.get( + "/api/auth/forgejo/start", + follow_redirects=False, + ) + state = parse_qs(urlparse(start_response.headers["location"]).query)["state"][0] + callback_response = self.client.get( + f"/api/auth/forgejo/callback?code=auth-code&state={state}", + follow_redirects=False, + ) + + fresh_client = TestClient(create_app()) + fresh_client.cookies.set("robot_u_session", callback_response.cookies["robot_u_session"]) + session_response = fresh_client.get("/api/auth/session") + + self.assertEqual(session_response.status_code, 200) + self.assertEqual(session_response.json()["authenticated"], True) + self.assertEqual(session_response.json()["login"], "kacper") + self.assertEqual(session_response.json()["source"], "session") + def test_create_discussion_reply(self) -> None: fake_client = _FakeForgejoClient( comment={