Files
TUM-typst4ei/src/sources/gitea.py
2026-01-24 15:46:24 +01:00

140 lines
4.9 KiB
Python

from sources import CSSourceGitea, CSItem, CSInventoryItem
from sources.util import cache_cheatsheet, get_datestring
import httpx
from pathlib import Path
from logger import get_worker_thread_logger
def process_gitea(item: CSInventoryItem, outdir: str) -> list[CSItem] | None:
logger = get_worker_thread_logger()
logger.info(f"Processing Gitea cheatsheet: {item.source.owner}/{item.source.repo}@{item.source.tag}")
source: CSSourceGitea = item.source
commit_hash = get_release_commit_sha(source.fetch_url or source.base_url, source.owner, source.repo, source.tag)
assets = list_release_assets(source.fetch_url or source.base_url, source.owner, source.repo, source.tag)
assets = list(filter(lambda a: a[1].endswith(".pdf"), assets))
print(assets)
assets_urls = list(map(lambda a: (
f"{source.fetch_url or source.base_url}/{source.owner}/{source.repo}/releases/download/{source.tag}/{a[1]}",
f"{source.base_url}/{source.owner}/{source.repo}/releases/download/{source.tag}/{a[1]}"
), assets),
)
logger.info(f"Found {len(assets_urls)} PDF assets in Gitea release {source.owner}/{source.repo}@{source.tag}")
res = []
for fetch_url, real_url in assets_urls:
if item.cache:
cache_url = cache_cheatsheet(fetch_url, outdir)
if cache_url:
real_url = cache_url
else:
continue
name = Path(real_url).stem
res.append(CSItem(
url = real_url,
date=get_datestring(),
commit=commit_hash[:10] if commit_hash else "",
author=item.author if item.author else source.owner,
title=f"{name}",
id=item.id,
git_repo=f"{source.base_url}/{source.owner}/{source.repo}" if not source.hide_repo else "",
git_repo_type="Gitea"
))
return res
def get_release_commit_sha(base_url, owner, repo, tag_name, token=None):
"""
Resolve the commit SHA for a Gitea release tag.
:param base_url: e.g. "https://gitea.example.com"
:param owner: repo owner
:param repo: repository name
:param tag_name: release tag (e.g. "v1.2.3")
:param token: optional API token
:return: commit SHA (str)
"""
with httpx.Client() as client:
headers = {}
if token:
headers["Authorization"] = f"token {token}"
# 1) List tags and find the matching tag
tags_url = f"{base_url}/api/v1/repos/{owner}/{repo}/tags"
resp = client.get(tags_url, headers=headers)
resp.raise_for_status()
tags = resp.json()
tag = next((t for t in tags if t["name"] == tag_name), None)
if not tag:
raise ValueError(f"Tag '{tag_name}' not found")
# Lightweight tags usually already contain the commit SHA
commit_sha = tag.get("commit", {}).get("sha")
tag_obj_sha = tag.get("id")
# If commit.sha looks valid, return it
if commit_sha:
return commit_sha
# 2) Annotated tag: dereference via /git/tags/{sha}
if not tag_obj_sha:
raise RuntimeError("Tag object SHA missing; cannot dereference annotated tag")
git_tag_url = f"{base_url}/api/v1/repos/{owner}/{repo}/git/tags/{tag_obj_sha}"
resp = client.get(git_tag_url, headers=headers)
resp.raise_for_status()
annotated = resp.json()
# The object pointed to by the tag (usually a commit)
target = annotated.get("object", {})
if target.get("type") != "commit":
raise RuntimeError(f"Tag points to a {target.get('type')} instead of a commit")
return target.get("sha")
def list_release_assets(base_url, owner, repo, tag, token=None):
"""
Return a list of (download_url, filename) for all assets of a Gitea release.
:param base_url: Gitea host URL, e.g. "https://gitea.example.com"
:param owner: repository owner
:param repo: repository name
:param tag: release tag name
:param token: optional API token
:returns: list of (download_url, filename) tuples
"""
with httpx.Client() as client:
headers = {}
if token:
headers["Authorization"] = f"token {token}"
# 1) Get release by tag
rel_url = f"{base_url}/api/v1/repos/{owner}/{repo}/releases/tags/{tag}"
rel_resp = client.get(rel_url, headers=headers)
rel_resp.raise_for_status()
release: dict = rel_resp.json()
assets = release.get("assets", [])
result = []
for asset in assets:
# Gitea asset info usually contains:
# - "browser_download_url" → direct URL
# - "name" → filename
download_url = asset.get("browser_download_url")
filename = asset.get("name")
if download_url and filename:
result.append((download_url, filename))
return result