moved to asyncd

This commit is contained in:
alexander
2026-01-23 09:21:42 +01:00
parent efc2116de4
commit e93e743f65
11 changed files with 106 additions and 76 deletions

View File

@@ -51,6 +51,7 @@ async def build(trigger_list: list[str] | None = None):
# Copy to prod
print("Copying to prod directory...")
shutil.copytree(OUTPUT_DIR, PROD_DIR, dirs_exist_ok=True)
print("Done.")
if __name__ == "__main__":

View File

@@ -38,6 +38,7 @@ async def prepare_cheatsheets(config: CSInventoryConfig, outdir: str) -> list[CS
print("Unknow Source Type:", item.source.type)
except:
traceback.print_exc()
print("Error processing item:", item)
new_item = None
if new_items:

View File

@@ -55,5 +55,6 @@ async def trigger_all():
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="0.0.0.0", port=8000)
uvicorn.run(app, host="0.0.0.0", port=8000, log_level="debug")

View File

@@ -1,23 +1,27 @@
from sources import CSSourceGitea, CSItem, CSInventoryItem
from sources.util import cache_cheatsheet, get_datestring
import requests
import httpx
from pathlib import Path
async def process_gitea(item: CSInventoryItem, outdir: str) -> list[CSItem] | None:
source: CSSourceGitea = item.source
commit_hash = get_release_commit_sha(source.base_url, source.owner, source.repo, source.tag)
asserts = list_release_assets(source.base_url, source.owner, source.repo, source.tag)
commit_hash = await get_release_commit_sha(source.base_url, source.owner, source.repo, source.tag)
asserts = await list_release_assets(source.base_url, source.owner, source.repo, source.tag)
asserts = filter(lambda a: a[1].endswith(".pdf"), asserts)
asserts = list(filter(lambda a: a[1].endswith(".pdf"), asserts))
asserts = list(map(lambda a: (a[0], f"{source.base_url}/repos/{source.owner}/{source.repo}/releases/download/{source.tag}/{a[0]}"), asserts))
print(f"Found {len(asserts)} PDF assets in Gitea release {source.owner}/{source.repo}@{source.tag}")
res = []
for a in asserts:
res_url = a[0]
if item.cache:
cache_url = cache_cheatsheet(a[0], outdir)
cache_url = await cache_cheatsheet(a[0], outdir)
if cache_url:
res_url = cache_url
else:
@@ -38,7 +42,7 @@ async def process_gitea(item: CSInventoryItem, outdir: str) -> list[CSItem] | No
return res
def get_release_commit_sha(base_url, owner, repo, tag_name, token=None):
async def get_release_commit_sha(base_url, owner, repo, tag_name, token=None):
"""
Resolve the commit SHA for a Gitea release tag.
@@ -50,49 +54,48 @@ def get_release_commit_sha(base_url, owner, repo, tag_name, token=None):
:return: commit SHA (str)
"""
headers = {}
if token:
headers["Authorization"] = f"token {token}"
session = requests.Session()
session.headers.update(headers)
async with httpx.AsyncClient() as client:
headers = {}
if token:
headers["Authorization"] = f"token {token}"
# 1) List tags and find the matching tag
tags_url = f"{base_url}/api/v1/repos/{owner}/{repo}/tags"
resp = session.get(tags_url)
resp.raise_for_status()
tags = resp.json()
# 1) List tags and find the matching tag
tags_url = f"{base_url}/api/v1/repos/{owner}/{repo}/tags"
resp = await client.get(tags_url, headers=headers)
resp.raise_for_status()
tags = resp.json()
tag = next((t for t in tags if t["name"] == tag_name), None)
if not tag:
raise ValueError(f"Tag '{tag_name}' not found")
tag = next((t for t in tags if t["name"] == tag_name), None)
if not tag:
raise ValueError(f"Tag '{tag_name}' not found")
# Lightweight tags usually already contain the commit SHA
commit_sha = tag.get("commit", {}).get("sha")
tag_obj_sha = tag.get("id")
# Lightweight tags usually already contain the commit SHA
commit_sha = tag.get("commit", {}).get("sha")
tag_obj_sha = tag.get("id")
# If commit.sha looks valid, return it
if commit_sha:
return commit_sha
# If commit.sha looks valid, return it
if commit_sha:
return commit_sha
# 2) Annotated tag: dereference via /git/tags/{sha}
if not tag_obj_sha:
raise RuntimeError("Tag object SHA missing; cannot dereference annotated tag")
# 2) Annotated tag: dereference via /git/tags/{sha}
if not tag_obj_sha:
raise RuntimeError("Tag object SHA missing; cannot dereference annotated tag")
git_tag_url = f"{base_url}/api/v1/repos/{owner}/{repo}/git/tags/{tag_obj_sha}"
resp = session.get(git_tag_url)
resp.raise_for_status()
annotated = resp.json()
git_tag_url = f"{base_url}/api/v1/repos/{owner}/{repo}/git/tags/{tag_obj_sha}"
resp = await client.get(git_tag_url, headers=headers)
resp.raise_for_status()
annotated = resp.json()
# The object pointed to by the tag (usually a commit)
target = annotated.get("object", {})
if target.get("type") != "commit":
raise RuntimeError(f"Tag points to a {target.get('type')} instead of a commit")
# The object pointed to by the tag (usually a commit)
target = annotated.get("object", {})
if target.get("type") != "commit":
raise RuntimeError(f"Tag points to a {target.get('type')} instead of a commit")
return target.get("sha")
def list_release_assets(base_url, owner, repo, tag, token=None):
async def list_release_assets(base_url, owner, repo, tag, token=None):
"""
Return a list of (download_url, filename) for all assets of a Gitea release.
@@ -103,26 +106,28 @@ def list_release_assets(base_url, owner, repo, tag, token=None):
:param token: optional API token
:returns: list of (download_url, filename) tuples
"""
headers = {}
if token:
headers["Authorization"] = f"token {token}"
# 1) Get release by tag
rel_url = f"{base_url}/api/v1/repos/{owner}/{repo}/releases/tags/{tag}"
rel_resp = requests.get(rel_url, headers=headers)
rel_resp.raise_for_status()
release = rel_resp.json()
async with httpx.AsyncClient() as client:
headers = {}
if token:
headers["Authorization"] = f"token {token}"
assets = release.get("assets", [])
result = []
# 1) Get release by tag
rel_url = f"{base_url}/api/v1/repos/{owner}/{repo}/releases/tags/{tag}"
rel_resp = await client.get(rel_url, headers=headers)
rel_resp.raise_for_status()
release: dict = rel_resp.json()
for asset in assets:
# Gitea asset info usually contains:
# - "browser_download_url" → direct URL
# - "name" → filename
download_url = asset.get("browser_download_url")
filename = asset.get("name")
if download_url and filename:
result.append((download_url, filename))
assets = release.get("assets", [])
result = []
for asset in assets:
# Gitea asset info usually contains:
# - "browser_download_url" → direct URL
# - "name" → filename
download_url = asset.get("browser_download_url")
filename = asset.get("name")
if download_url and filename:
result.append((download_url, filename))
return result

View File

@@ -6,7 +6,7 @@ async def process_plain_url(item: CSInventoryItem, outdir: str) -> CSItem | None
res_url = source.url
if item.cache:
cache_url = cache_cheatsheet(source.url, outdir)
cache_url = await cache_cheatsheet(source.url, outdir)
if cache_url:
res_url = cache_url
else:

View File

@@ -1,22 +1,32 @@
import hashlib
import requests
import httpx
import datetime
import os
from pathlib import Path
from urllib.parse import urlparse
def get_datestring() -> str:
return datetime.datetime.now().strftime("%d.%m.%y")
def cache_cheatsheet(url, outdir: str) -> str | None:
r = requests.get(url)
if not r.ok and r.headers.get("Content-Type") != "application/pdf":
return None
async def cache_cheatsheet(url, outdir: str) -> str | None:
print("Caching cheatsheet from", url)
try:
async with httpx.AsyncClient() as client:
r = await client.get(url, timeout=5.0)
if not r.is_success and r.headers.get("Content-Type") != "application/pdf":
return None
except httpx.TimeoutException:
print("Timeout fetching URL:", url)
return None
data = r.content
hashdata = hashlib.sha256(data)
url_base_name = Path(urlparse(url).path).stem
filesname = os.path.join("cache", f"{hashdata.hexdigest()}.pdf")
filesname = os.path.join("cache", f"{url_base_name}.pdf")
if not os.path.exists(os.path.join(outdir, "cache")):
os.mkdir(os.path.join(outdir, "cache"))