173 lines
4.7 KiB
Python
Executable file
173 lines
4.7 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
from pathlib import Path
|
|
from urllib.error import HTTPError
|
|
from urllib.parse import quote
|
|
from urllib.request import Request, urlopen
|
|
|
|
|
|
DEFAULT_FORGEJO_BASE_URL = "https://aksal.cloud"
|
|
DEFAULT_REPO = "Robot-U/robot-u-site"
|
|
DEFAULT_KEY_TITLE = "robot-u-site-actions-clone"
|
|
DEFAULT_SECRET_NAME = "CI_REPO_SSH_KEY"
|
|
|
|
|
|
def main() -> int:
|
|
root_dir = Path(__file__).resolve().parents[1]
|
|
_load_env_file(root_dir / ".env")
|
|
_load_env_file(root_dir / ".env.local")
|
|
|
|
token = os.getenv("FORGEJO_API_TOKEN") or os.getenv("FORGEJO_TOKEN")
|
|
if not token:
|
|
print(
|
|
"Set FORGEJO_API_TOKEN to a Forgejo token that can manage repo keys/secrets.",
|
|
file=sys.stderr,
|
|
)
|
|
return 1
|
|
|
|
base_url = (os.getenv("FORGEJO_BASE_URL") or DEFAULT_FORGEJO_BASE_URL).rstrip("/")
|
|
repo = os.getenv("FORGEJO_REPO") or DEFAULT_REPO
|
|
key_title = os.getenv("CI_CLONE_KEY_TITLE") or DEFAULT_KEY_TITLE
|
|
secret_name = os.getenv("CI_CLONE_SECRET_NAME") or DEFAULT_SECRET_NAME
|
|
owner, repo_name = _repo_parts(repo)
|
|
|
|
ssh_keygen = shutil.which("ssh-keygen")
|
|
if not ssh_keygen:
|
|
print("ssh-keygen is required.", file=sys.stderr)
|
|
return 1
|
|
|
|
with tempfile.TemporaryDirectory(prefix="robot-u-ci-key.") as temp_dir:
|
|
key_path = Path(temp_dir) / "id_ed25519"
|
|
subprocess.run(
|
|
[
|
|
ssh_keygen,
|
|
"-t",
|
|
"ed25519",
|
|
"-N",
|
|
"",
|
|
"-C",
|
|
key_title,
|
|
"-f",
|
|
str(key_path),
|
|
],
|
|
check=True,
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL,
|
|
)
|
|
|
|
public_key = key_path.with_suffix(".pub").read_text().strip()
|
|
private_key = key_path.read_text()
|
|
|
|
deploy_key = _create_deploy_key(
|
|
base_url,
|
|
token,
|
|
owner,
|
|
repo_name,
|
|
key_title,
|
|
public_key,
|
|
)
|
|
_put_actions_secret(base_url, token, owner, repo_name, secret_name, private_key)
|
|
|
|
print(f"Added read-only deploy key {deploy_key.get('id', '(unknown id)')} to {repo}.")
|
|
print(f"Updated repository Actions secret {secret_name}.")
|
|
print("Private key material was only held in memory and a temporary directory.")
|
|
return 0
|
|
|
|
|
|
def _load_env_file(path: Path) -> None:
|
|
if not path.exists():
|
|
return
|
|
|
|
for raw_line in path.read_text().splitlines():
|
|
line = raw_line.strip()
|
|
if not line or line.startswith("#") or "=" not in line:
|
|
continue
|
|
|
|
key, value = line.split("=", 1)
|
|
key = key.strip().removeprefix("export ").strip()
|
|
value = value.strip().strip('"').strip("'")
|
|
os.environ.setdefault(key, value)
|
|
|
|
|
|
def _repo_parts(repo: str) -> tuple[str, str]:
|
|
parts = repo.split("/", 1)
|
|
if len(parts) != 2 or not parts[0] or not parts[1]:
|
|
raise SystemExit("FORGEJO_REPO must use owner/repo format.")
|
|
return parts[0], parts[1]
|
|
|
|
|
|
def _create_deploy_key(
|
|
base_url: str,
|
|
token: str,
|
|
owner: str,
|
|
repo: str,
|
|
title: str,
|
|
public_key: str,
|
|
) -> dict[str, object]:
|
|
path = f"/api/v1/repos/{quote(owner)}/{quote(repo)}/keys"
|
|
payload = {
|
|
"title": title,
|
|
"key": public_key,
|
|
"read_only": True,
|
|
}
|
|
return _request_json(base_url, token, "POST", path, payload)
|
|
|
|
|
|
def _put_actions_secret(
|
|
base_url: str,
|
|
token: str,
|
|
owner: str,
|
|
repo: str,
|
|
name: str,
|
|
value: str,
|
|
) -> None:
|
|
path = f"/api/v1/repos/{quote(owner)}/{quote(repo)}/actions/secrets/{quote(name)}"
|
|
_request_json(base_url, token, "PUT", path, {"data": value})
|
|
|
|
|
|
def _request_json(
|
|
base_url: str,
|
|
token: str,
|
|
method: str,
|
|
path: str,
|
|
payload: dict[str, object],
|
|
) -> dict[str, object]:
|
|
data = json.dumps(payload).encode("utf-8")
|
|
request = Request(
|
|
f"{base_url}{path}",
|
|
data=data,
|
|
method=method,
|
|
headers={
|
|
"Authorization": f"token {token}",
|
|
"Content-Type": "application/json",
|
|
"Accept": "application/json",
|
|
},
|
|
)
|
|
try:
|
|
with urlopen(request, timeout=20) as response:
|
|
if response.status == 204:
|
|
return {}
|
|
raw_body = response.read()
|
|
except HTTPError as error:
|
|
detail = error.read().decode("utf-8", errors="replace")
|
|
print(f"Forgejo API returned {error.code}: {detail}", file=sys.stderr)
|
|
raise SystemExit(1) from error
|
|
|
|
if not raw_body:
|
|
return {}
|
|
|
|
decoded = json.loads(raw_body.decode("utf-8"))
|
|
if isinstance(decoded, dict):
|
|
return decoded
|
|
return {}
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|