robot-u-site/forgejo_client.py
kacper 853e99ca5f
Some checks failed
CI / check (push) Failing after 8s
Prepare deployment and Forgejo CI
2026-04-14 20:17:29 -04:00

356 lines
12 KiB
Python

from __future__ import annotations
import base64
from collections.abc import Mapping
from typing import Any
import httpx
from settings import Settings
class ForgejoClientError(RuntimeError):
pass
class ForgejoClient:
def __init__(
self,
settings: Settings,
forgejo_token: str | None = None,
auth_scheme: str = "token",
) -> None:
self._settings = settings
self._forgejo_token = forgejo_token or settings.forgejo_token
self._auth_scheme = auth_scheme
self._client = httpx.AsyncClient(timeout=settings.forgejo_request_timeout_seconds)
async def __aenter__(self) -> ForgejoClient:
return self
async def __aexit__(self, _exc_type: object, _exc: object, _tb: object) -> None:
await self._client.aclose()
async def fetch_openid_configuration(self) -> dict[str, Any]:
return await self._get_json(
f"{self._settings.forgejo_base_url}/.well-known/openid-configuration",
absolute_url=True,
)
async def exchange_oauth_code(
self,
*,
token_endpoint: str,
client_id: str,
client_secret: str,
code: str,
redirect_uri: str,
code_verifier: str,
) -> dict[str, Any]:
response = await self._client.post(
token_endpoint,
data={
"grant_type": "authorization_code",
"client_id": client_id,
"client_secret": client_secret,
"code": code,
"redirect_uri": redirect_uri,
"code_verifier": code_verifier,
},
headers={"Accept": "application/json"},
)
if response.is_success:
payload = response.json()
if isinstance(payload, dict):
return payload
raise ForgejoClientError("Unexpected Forgejo OAuth token payload.")
message = self._extract_error_message(response)
raise ForgejoClientError(f"{response.status_code} from Forgejo OAuth: {message}")
async def fetch_userinfo(self, userinfo_endpoint: str, access_token: str) -> dict[str, Any]:
response = await self._client.get(
userinfo_endpoint,
headers={
"Accept": "application/json",
"Authorization": f"Bearer {access_token}",
},
)
if response.is_success:
payload = response.json()
if isinstance(payload, dict):
return payload
raise ForgejoClientError("Unexpected Forgejo UserInfo payload.")
message = self._extract_error_message(response)
raise ForgejoClientError(f"{response.status_code} from Forgejo UserInfo: {message}")
async def fetch_current_user(self) -> dict[str, Any]:
return await self._get_json("/api/v1/user", auth_required=True)
async def search_repositories(self) -> list[dict[str, Any]]:
scan_limit = max(self._settings.forgejo_repo_scan_limit, 1)
page_limit = min(scan_limit, 50)
repos: list[dict[str, Any]] = []
page = 1
while len(repos) < scan_limit:
payload = await self._get_json(
"/api/v1/repos/search",
params={
"limit": page_limit,
"page": page,
"private": "false",
"is_private": "false",
},
)
if not isinstance(payload, dict):
break
data = payload.get("data", [])
page_repos = [repo for repo in data if isinstance(repo, dict)]
repos.extend(page_repos)
if len(page_repos) < page_limit:
break
page += 1
return repos[:scan_limit]
async def fetch_repository(self, owner: str, repo: str) -> dict[str, Any]:
payload = await self._get_json(
f"/api/v1/repos/{owner}/{repo}",
)
if isinstance(payload, dict):
return payload
raise ForgejoClientError(f"Unexpected repository payload for {owner}/{repo}")
async def list_repo_issues(
self,
owner: str,
repo: str,
*,
limit: int,
) -> list[dict[str, Any]]:
payload = await self._get_json(
f"/api/v1/repos/{owner}/{repo}/issues",
params={
"state": "open",
"page": 1,
"limit": limit,
"type": "issues",
"sort": "recentupdate",
},
)
if isinstance(payload, list):
return [issue for issue in payload if isinstance(issue, dict)]
return []
async def fetch_issue(self, owner: str, repo: str, issue_number: int) -> dict[str, Any]:
payload = await self._get_json(f"/api/v1/repos/{owner}/{repo}/issues/{issue_number}")
if isinstance(payload, dict):
return payload
raise ForgejoClientError(f"Unexpected issue payload for {owner}/{repo}#{issue_number}")
async def list_directory(self, owner: str, repo: str, path: str = "") -> list[dict[str, Any]]:
endpoint = f"/api/v1/repos/{owner}/{repo}/contents"
if path:
endpoint = f"{endpoint}/{path.strip('/')}"
payload = await self._get_json(endpoint)
if isinstance(payload, list):
return [entry for entry in payload if isinstance(entry, dict)]
if isinstance(payload, dict):
return [payload]
return []
async def list_issue_comments(
self,
owner: str,
repo: str,
issue_number: int,
) -> list[dict[str, Any]]:
payload = await self._get_json(
f"/api/v1/repos/{owner}/{repo}/issues/{issue_number}/comments",
)
if isinstance(payload, list):
return [comment for comment in payload if isinstance(comment, dict)]
return []
async def create_issue_comment(
self,
owner: str,
repo: str,
issue_number: int,
body: str,
) -> dict[str, Any]:
payload = await self._request_json(
"POST",
f"/api/v1/repos/{owner}/{repo}/issues/{issue_number}/comments",
json_payload={"body": body},
auth_required=True,
)
if isinstance(payload, dict):
return payload
raise ForgejoClientError(f"Unexpected comment payload for {owner}/{repo}#{issue_number}")
async def create_issue(
self,
owner: str,
repo: str,
title: str,
body: str,
label_ids: list[int] | None = None,
) -> dict[str, Any]:
payload_data: dict[str, object] = {"title": title, "body": body}
if label_ids:
payload_data["labels"] = label_ids
payload = await self._request_json(
"POST",
f"/api/v1/repos/{owner}/{repo}/issues",
json_payload=payload_data,
auth_required=True,
)
if isinstance(payload, dict):
return payload
raise ForgejoClientError(f"Unexpected issue payload for {owner}/{repo}")
async def ensure_repo_label(
self,
owner: str,
repo: str,
name: str,
*,
color: str,
description: str,
) -> int:
for label in await self.list_repo_labels(owner, repo):
if str(label.get("name", "")).strip().casefold() != name.casefold():
continue
label_id = int(label.get("id", 0) or 0)
if label_id > 0:
return label_id
label = await self.create_repo_label(
owner, repo, name, color=color, description=description
)
label_id = int(label.get("id", 0) or 0)
if label_id <= 0:
raise ForgejoClientError(f"Forgejo did not return an id for label {name!r}.")
return label_id
async def list_repo_labels(self, owner: str, repo: str) -> list[dict[str, Any]]:
payload = await self._get_json(
f"/api/v1/repos/{owner}/{repo}/labels",
params={"page": 1, "limit": 100},
auth_required=True,
)
if isinstance(payload, list):
return [label for label in payload if isinstance(label, dict)]
return []
async def create_repo_label(
self,
owner: str,
repo: str,
name: str,
*,
color: str,
description: str,
) -> dict[str, Any]:
payload = await self._request_json(
"POST",
f"/api/v1/repos/{owner}/{repo}/labels",
json_payload={"name": name, "color": color, "description": description},
auth_required=True,
)
if isinstance(payload, dict):
return payload
raise ForgejoClientError(f"Unexpected label payload for {owner}/{repo}")
async def get_file_content(self, owner: str, repo: str, path: str) -> dict[str, str]:
payload = await self._get_json(
f"/api/v1/repos/{owner}/{repo}/contents/{path.strip('/')}",
)
if not isinstance(payload, dict):
raise ForgejoClientError(f"Unexpected file payload for {owner}/{repo}:{path}")
raw_content = payload.get("content", "")
encoding = payload.get("encoding", "")
decoded_content = ""
if isinstance(raw_content, str):
if encoding == "base64":
decoded_content = base64.b64decode(raw_content).decode("utf-8")
else:
decoded_content = raw_content
return {
"name": str(payload.get("name", "")),
"path": str(payload.get("path", path)),
"html_url": str(payload.get("html_url", "")),
"content": decoded_content,
}
async def _get_json(
self,
path: str,
*,
absolute_url: bool = False,
params: Mapping[str, str | int] | None = None,
auth_required: bool = False,
) -> dict[str, Any] | list[Any]:
return await self._request_json(
"GET",
path,
absolute_url=absolute_url,
params=params,
auth_required=auth_required,
)
async def _request_json(
self,
method: str,
path: str,
*,
absolute_url: bool = False,
params: Mapping[str, str | int] | None = None,
json_payload: Mapping[str, object] | None = None,
auth_required: bool = False,
) -> dict[str, Any] | list[Any]:
if auth_required and not self._forgejo_token:
raise ForgejoClientError(
"This Forgejo instance requires an authenticated API token for repo and issue access.",
)
url = path if absolute_url else f"{self._settings.forgejo_base_url}{path}"
headers = {}
if self._forgejo_token:
scheme = "Bearer" if self._auth_scheme.casefold() == "bearer" else "token"
headers["Authorization"] = f"{scheme} {self._forgejo_token}"
response = await self._client.request(
method,
url,
params=params,
headers=headers,
json=json_payload,
)
if response.is_success:
return response.json()
message = self._extract_error_message(response)
raise ForgejoClientError(f"{response.status_code} from Forgejo: {message}")
@staticmethod
def _extract_error_message(response: httpx.Response) -> str:
try:
payload = response.json()
except ValueError:
return response.text or response.reason_phrase
if isinstance(payload, dict):
raw_message = payload.get("message")
if isinstance(raw_message, str) and raw_message:
return raw_message
return response.reason_phrase