#!/usr/bin/env python3 from __future__ import annotations import json import os import shutil import subprocess import sys import tempfile from pathlib import Path from urllib.error import HTTPError from urllib.parse import quote from urllib.request import Request, urlopen DEFAULT_FORGEJO_BASE_URL = "https://aksal.cloud" DEFAULT_REPO = "Robot-U/robot-u-site" DEFAULT_KEY_TITLE = "robot-u-site-actions-clone" DEFAULT_SECRET_NAME = "CI_REPO_SSH_KEY" def main() -> int: root_dir = Path(__file__).resolve().parents[1] _load_env_file(root_dir / ".env") _load_env_file(root_dir / ".env.local") token = os.getenv("FORGEJO_API_TOKEN") or os.getenv("FORGEJO_TOKEN") if not token: print( "Set FORGEJO_API_TOKEN to a Forgejo token that can manage repo keys/secrets.", file=sys.stderr, ) return 1 base_url = (os.getenv("FORGEJO_BASE_URL") or DEFAULT_FORGEJO_BASE_URL).rstrip("/") repo = os.getenv("FORGEJO_REPO") or DEFAULT_REPO key_title = os.getenv("CI_CLONE_KEY_TITLE") or DEFAULT_KEY_TITLE secret_name = os.getenv("CI_CLONE_SECRET_NAME") or DEFAULT_SECRET_NAME owner, repo_name = _repo_parts(repo) ssh_keygen = shutil.which("ssh-keygen") if not ssh_keygen: print("ssh-keygen is required.", file=sys.stderr) return 1 with tempfile.TemporaryDirectory(prefix="robot-u-ci-key.") as temp_dir: key_path = Path(temp_dir) / "id_ed25519" subprocess.run( [ ssh_keygen, "-t", "ed25519", "-N", "", "-C", key_title, "-f", str(key_path), ], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) public_key = key_path.with_suffix(".pub").read_text().strip() private_key = key_path.read_text() deploy_key = _create_deploy_key( base_url, token, owner, repo_name, key_title, public_key, ) _put_actions_secret(base_url, token, owner, repo_name, secret_name, private_key) print(f"Added read-only deploy key {deploy_key.get('id', '(unknown id)')} to {repo}.") print(f"Updated repository Actions secret {secret_name}.") print("Private key material was only held in memory and a temporary directory.") return 0 def _load_env_file(path: Path) -> None: if not path.exists(): return for raw_line in path.read_text().splitlines(): line = raw_line.strip() if not line or line.startswith("#") or "=" not in line: continue key, value = line.split("=", 1) key = key.strip().removeprefix("export ").strip() value = value.strip().strip('"').strip("'") os.environ.setdefault(key, value) def _repo_parts(repo: str) -> tuple[str, str]: parts = repo.split("/", 1) if len(parts) != 2 or not parts[0] or not parts[1]: raise SystemExit("FORGEJO_REPO must use owner/repo format.") return parts[0], parts[1] def _create_deploy_key( base_url: str, token: str, owner: str, repo: str, title: str, public_key: str, ) -> dict[str, object]: path = f"/api/v1/repos/{quote(owner)}/{quote(repo)}/keys" payload = { "title": title, "key": public_key, "read_only": True, } return _request_json(base_url, token, "POST", path, payload) def _put_actions_secret( base_url: str, token: str, owner: str, repo: str, name: str, value: str, ) -> None: path = f"/api/v1/repos/{quote(owner)}/{quote(repo)}/actions/secrets/{quote(name)}" _request_json(base_url, token, "PUT", path, {"data": value}) def _request_json( base_url: str, token: str, method: str, path: str, payload: dict[str, object], ) -> dict[str, object]: data = json.dumps(payload).encode("utf-8") request = Request( f"{base_url}{path}", data=data, method=method, headers={ "Authorization": f"token {token}", "Content-Type": "application/json", "Accept": "application/json", }, ) try: with urlopen(request, timeout=20) as response: if response.status == 204: return {} raw_body = response.read() except HTTPError as error: detail = error.read().decode("utf-8", errors="replace") print(f"Forgejo API returned {error.code}: {detail}", file=sys.stderr) raise SystemExit(1) from error if not raw_body: return {} decoded = json.loads(raw_body.decode("utf-8")) if isinstance(decoded, dict): return decoded return {} if __name__ == "__main__": raise SystemExit(main())