from __future__ import annotations import base64 from collections.abc import Mapping from typing import Any import httpx from settings import Settings class ForgejoClientError(RuntimeError): pass class ForgejoClient: def __init__(self, settings: Settings) -> None: self._settings = settings self._client = httpx.AsyncClient(timeout=settings.forgejo_request_timeout_seconds) async def __aenter__(self) -> ForgejoClient: return self async def __aexit__(self, _exc_type: object, _exc: object, _tb: object) -> None: await self._client.aclose() async def fetch_openid_configuration(self) -> dict[str, Any]: return await self._get_json( f"{self._settings.forgejo_base_url}/.well-known/openid-configuration", absolute_url=True, ) async def fetch_current_user(self) -> dict[str, Any]: return await self._get_json("/api/v1/user", auth_required=True) async def search_repositories(self) -> list[dict[str, Any]]: payload = await self._get_json( "/api/v1/repos/search", params={ "limit": self._settings.forgejo_repo_scan_limit, "page": 1, }, auth_required=True, ) data = payload.get("data", []) return [repo for repo in data if isinstance(repo, dict)] async def search_recent_issues(self) -> list[dict[str, Any]]: payload = await self._get_json( "/api/v1/repos/issues/search", params={ "state": "open", "page": 1, "limit": self._settings.forgejo_recent_issue_limit, "type": "issues", }, auth_required=True, ) if isinstance(payload, list): return [issue for issue in payload if isinstance(issue, dict)] return [] async def list_directory(self, owner: str, repo: str, path: str = "") -> list[dict[str, Any]]: endpoint = f"/api/v1/repos/{owner}/{repo}/contents" if path: endpoint = f"{endpoint}/{path.strip('/')}" payload = await self._get_json(endpoint, auth_required=True) if isinstance(payload, list): return [entry for entry in payload if isinstance(entry, dict)] if isinstance(payload, dict): return [payload] return [] async def list_issue_comments( self, owner: str, repo: str, issue_number: int, ) -> list[dict[str, Any]]: payload = await self._get_json( f"/api/v1/repos/{owner}/{repo}/issues/{issue_number}/comments", auth_required=True, ) if isinstance(payload, list): return [comment for comment in payload if isinstance(comment, dict)] return [] async def get_file_content(self, owner: str, repo: str, path: str) -> dict[str, str]: payload = await self._get_json( f"/api/v1/repos/{owner}/{repo}/contents/{path.strip('/')}", auth_required=True, ) if not isinstance(payload, dict): raise ForgejoClientError(f"Unexpected file payload for {owner}/{repo}:{path}") raw_content = payload.get("content", "") encoding = payload.get("encoding", "") decoded_content = "" if isinstance(raw_content, str): if encoding == "base64": decoded_content = base64.b64decode(raw_content).decode("utf-8") else: decoded_content = raw_content return { "name": str(payload.get("name", "")), "path": str(payload.get("path", path)), "html_url": str(payload.get("html_url", "")), "content": decoded_content, } async def _get_json( self, path: str, *, absolute_url: bool = False, params: Mapping[str, str | int] | None = None, auth_required: bool = False, ) -> dict[str, Any] | list[Any]: if auth_required and not self._settings.forgejo_token: raise ForgejoClientError( "This Forgejo instance requires an authenticated API token for repo and issue access.", ) url = path if absolute_url else f"{self._settings.forgejo_base_url}{path}" headers = {} if self._settings.forgejo_token: headers["Authorization"] = f"token {self._settings.forgejo_token}" response = await self._client.get(url, params=params, headers=headers) if response.is_success: return response.json() message = self._extract_error_message(response) raise ForgejoClientError(f"{response.status_code} from Forgejo: {message}") @staticmethod def _extract_error_message(response: httpx.Response) -> str: try: payload = response.json() except ValueError: return response.text or response.reason_phrase if isinstance(payload, dict): raw_message = payload.get("message") if isinstance(raw_message, str) and raw_message: return raw_message return response.reason_phrase