Moved back to sync and added logging
This commit is contained in:
4
.gitignore
vendored
4
.gitignore
vendored
@@ -7,4 +7,6 @@ __pycache__/
|
|||||||
package-lock.json
|
package-lock.json
|
||||||
package.json
|
package.json
|
||||||
|
|
||||||
cheatsheet_inventory.json
|
cheatsheet_inventory.json
|
||||||
|
*.log
|
||||||
|
config.yaml
|
||||||
11
.vscode/tasks.json
vendored
11
.vscode/tasks.json
vendored
@@ -39,6 +39,17 @@
|
|||||||
"kind": "build",
|
"kind": "build",
|
||||||
"isDefault": true
|
"isDefault": true
|
||||||
}
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"label": "trigger local build all",
|
||||||
|
"type": "shell",
|
||||||
|
"command": "curl",
|
||||||
|
"args": ["http://localhost:8000/trigger/all"],
|
||||||
|
"group": {
|
||||||
|
"kind": "build",
|
||||||
|
"isDefault": true
|
||||||
|
},
|
||||||
|
"problemMatcher": []
|
||||||
}
|
}
|
||||||
|
|
||||||
]
|
]
|
||||||
|
|||||||
@@ -12,10 +12,13 @@ httptools==0.7.1
|
|||||||
httpx==0.28.1
|
httpx==0.28.1
|
||||||
idna==3.11
|
idna==3.11
|
||||||
itsdangerous==2.2.0
|
itsdangerous==2.2.0
|
||||||
|
janus==2.0.0
|
||||||
Jinja2==3.1.6
|
Jinja2==3.1.6
|
||||||
|
libsass==0.23.0
|
||||||
livereload==2.7.1
|
livereload==2.7.1
|
||||||
MarkupSafe==3.0.3
|
MarkupSafe==3.0.3
|
||||||
pydantic==2.12.5
|
pydantic==2.12.5
|
||||||
|
pydantic-settings==2.12.0
|
||||||
pydantic_core==2.41.5
|
pydantic_core==2.41.5
|
||||||
python-dotenv==1.2.1
|
python-dotenv==1.2.1
|
||||||
PyYAML==6.0.3
|
PyYAML==6.0.3
|
||||||
|
|||||||
49
src/build.py
49
src/build.py
@@ -1,58 +1,57 @@
|
|||||||
import asyncio
|
|
||||||
from jinja2 import Environment, FileSystemLoader, select_autoescape
|
from jinja2 import Environment, FileSystemLoader, select_autoescape
|
||||||
|
|
||||||
import shutil
|
import shutil
|
||||||
import datetime
|
import time
|
||||||
import os
|
import os
|
||||||
|
import datetime
|
||||||
|
|
||||||
from sources import CSItem
|
from sources import CSItem
|
||||||
from inventory import load_cheatsheet_inventory, prepare_cheatsheets
|
from inventory import load_cheatsheet_inventory, prepare_cheatsheets
|
||||||
|
from config import get_settings
|
||||||
|
from logger import get_worker_thread_logger
|
||||||
|
|
||||||
INVENTORY_FILE = "cheatsheet_inventory.json"
|
def build(trigger_list: list[str] | None = None):
|
||||||
STATIC_DIR = "static"
|
start_time = time.time()
|
||||||
TEMPLATES_DIR = "templates"
|
settings = get_settings()
|
||||||
OUTPUT_DIR = "out"
|
|
||||||
PROD_DIR = "prod"
|
|
||||||
|
|
||||||
async def build(trigger_list: list[str] | None = None):
|
inv_raw = load_cheatsheet_inventory(settings.paths.inventory_file)
|
||||||
inv_raw = load_cheatsheet_inventory(INVENTORY_FILE)
|
|
||||||
|
|
||||||
# Clear output directory
|
# Clear output directory
|
||||||
shutil.rmtree(OUTPUT_DIR, ignore_errors=True)
|
shutil.rmtree(settings.paths.output, ignore_errors=True)
|
||||||
shutil.copytree(STATIC_DIR, OUTPUT_DIR)
|
shutil.copytree(settings.paths.static, settings.paths.output)
|
||||||
|
inv: list[CSItem] = prepare_cheatsheets(inv_raw, settings.paths.output)
|
||||||
|
|
||||||
inv: list[CSItem] = await prepare_cheatsheets(inv_raw, OUTPUT_DIR)
|
if not os.path.exists(settings.paths.prod):
|
||||||
|
os.mkdir(settings.paths.prod)
|
||||||
if not os.path.exists(PROD_DIR):
|
|
||||||
os.mkdir(PROD_DIR)
|
|
||||||
|
|
||||||
env = Environment(
|
env = Environment(
|
||||||
loader=FileSystemLoader(TEMPLATES_DIR),
|
loader=FileSystemLoader(settings.paths.templates),
|
||||||
autoescape=select_autoescape()
|
autoescape=select_autoescape()
|
||||||
)
|
)
|
||||||
|
|
||||||
index = env.get_template("index.html.j2")
|
index = env.get_template("index.html.j2")
|
||||||
|
|
||||||
print(f"{len(inv)} Cheatsheets")
|
logger = get_worker_thread_logger()
|
||||||
|
logger.info("Generated cheatsheets:")
|
||||||
for i in inv:
|
for i in inv:
|
||||||
print("-", i)
|
logger.info(f"- {i}")
|
||||||
|
|
||||||
thisYear = datetime.datetime.now().year
|
thisYear = datetime.datetime.now().year
|
||||||
|
|
||||||
with open(f"{OUTPUT_DIR}/index.html", "w", encoding="utf-8") as f:
|
with open(f"{settings.paths.output}/index.html", "w", encoding="utf-8") as f:
|
||||||
f.write(index.render(items=inv, thisYear=thisYear))
|
f.write(index.render(items=inv, thisYear=thisYear))
|
||||||
|
|
||||||
with open(f"{OUTPUT_DIR}/impressum.html", "w", encoding="utf-8") as f:
|
with open(f"{settings.paths.output}/impressum.html", "w", encoding="utf-8") as f:
|
||||||
f.write(env.get_template("impressum.html.j2").render(thisYear=thisYear))
|
f.write(env.get_template("impressum.html.j2").render(thisYear=thisYear))
|
||||||
|
|
||||||
with open(f"{OUTPUT_DIR}/license.html", "w", encoding="utf-8") as f:
|
with open(f"{settings.paths.output}/license.html", "w", encoding="utf-8") as f:
|
||||||
f.write(env.get_template("license.html.j2").render(thisYear=thisYear))
|
f.write(env.get_template("license.html.j2").render(thisYear=thisYear))
|
||||||
|
|
||||||
# Copy to prod
|
# Copy to prod
|
||||||
print("Copying to prod directory...")
|
logger.info("Copying output to production directory")
|
||||||
shutil.copytree(OUTPUT_DIR, PROD_DIR, dirs_exist_ok=True)
|
shutil.copytree(settings.paths.output, settings.paths.prod, dirs_exist_ok=True)
|
||||||
print("Done.")
|
logger.info("Done after {:.2f}s".format(time.time() - start_time))
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
asyncio.run(build())
|
build()
|
||||||
51
src/config.py
Normal file
51
src/config.py
Normal file
@@ -0,0 +1,51 @@
|
|||||||
|
|
||||||
|
from pydantic import Field
|
||||||
|
from pydantic_settings import BaseSettings, PydanticBaseSettingsSource, YamlConfigSettingsSource
|
||||||
|
|
||||||
|
|
||||||
|
class PathsConfig(BaseSettings):
|
||||||
|
"""Path configuration settings"""
|
||||||
|
inventory_file: str = Field(default="cheatsheet_inventory.json", description="Cheatsheet inventory file")
|
||||||
|
templates: str = Field(default="templates", description="Templates directory")
|
||||||
|
static: str = Field(default="static", description="Static files directory")
|
||||||
|
output: str = Field(default="prod", description="Output directory")
|
||||||
|
prod: str = Field(default="prod", description="Production directory")
|
||||||
|
|
||||||
|
|
||||||
|
class Settings(BaseSettings):
|
||||||
|
"""Main application settings loaded from YAML and environment variables"""
|
||||||
|
|
||||||
|
paths: PathsConfig = Field(default_factory=PathsConfig)
|
||||||
|
request_timeout: float = Field(default=2.0, description="Request timeout in seconds")
|
||||||
|
log_level: str = Field(default="DEBUG", description="Logging level")
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def settings_customise_sources(
|
||||||
|
cls,
|
||||||
|
settings_cls: type[BaseSettings],
|
||||||
|
init_settings: PydanticBaseSettingsSource,
|
||||||
|
env_settings: PydanticBaseSettingsSource,
|
||||||
|
dotenv_settings: PydanticBaseSettingsSource,
|
||||||
|
file_secret_settings: PydanticBaseSettingsSource,
|
||||||
|
) -> tuple[PydanticBaseSettingsSource, ...]:
|
||||||
|
return (
|
||||||
|
init_settings,
|
||||||
|
env_settings,
|
||||||
|
dotenv_settings,
|
||||||
|
YamlConfigSettingsSource(cls, yaml_file="config.yaml"),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# Global settings instance
|
||||||
|
global_settings: Settings = None
|
||||||
|
|
||||||
|
def get_settings() -> Settings:
|
||||||
|
"""Get the global settings instance, loading it if necessary"""
|
||||||
|
global global_settings
|
||||||
|
if global_settings is None:
|
||||||
|
global_settings = Settings()
|
||||||
|
return global_settings
|
||||||
|
|
||||||
|
def load_settings() -> Settings:
|
||||||
|
global global_settings
|
||||||
|
global_settings = Settings()
|
||||||
@@ -4,7 +4,7 @@ import traceback
|
|||||||
from sources import CSInventoryConfig, CSItem, CheatsheetSourceType
|
from sources import CSInventoryConfig, CSItem, CheatsheetSourceType
|
||||||
from sources.plain import process_plain_url
|
from sources.plain import process_plain_url
|
||||||
from sources.gitea import process_gitea
|
from sources.gitea import process_gitea
|
||||||
|
from logger import get_worker_thread_logger
|
||||||
|
|
||||||
def load_cheatsheet_inventory(file: str) -> CSInventoryConfig:
|
def load_cheatsheet_inventory(file: str) -> CSInventoryConfig:
|
||||||
if not os.path.exists(file):
|
if not os.path.exists(file):
|
||||||
@@ -20,33 +20,35 @@ def load_cheatsheet_inventory(file: str) -> CSInventoryConfig:
|
|||||||
return res
|
return res
|
||||||
|
|
||||||
|
|
||||||
async def prepare_cheatsheets(config: CSInventoryConfig, outdir: str) -> list[CSItem]:
|
def prepare_cheatsheets(config: CSInventoryConfig, outdir: str) -> list[CSItem]:
|
||||||
res: list[CSItem] = []
|
res: list[CSItem] = []
|
||||||
|
|
||||||
|
logger = get_worker_thread_logger()
|
||||||
|
|
||||||
for item in config.items:
|
for item in config.items:
|
||||||
new_items = []
|
new_items = []
|
||||||
|
|
||||||
try:
|
try:
|
||||||
match item.source.type:
|
match item.source.type:
|
||||||
case CheatsheetSourceType.GITEA_SOURCE:
|
case CheatsheetSourceType.GITEA_SOURCE:
|
||||||
new_items += await process_gitea(item, outdir)
|
new_items += process_gitea(item, outdir)
|
||||||
|
|
||||||
case CheatsheetSourceType.PLAIN_URL:
|
case CheatsheetSourceType.PLAIN_URL:
|
||||||
new_items.append(await process_plain_url(item, outdir))
|
new_items.append(process_plain_url(item, outdir))
|
||||||
|
|
||||||
case _:
|
case _:
|
||||||
print("Unknow Source Type:", item.source.type)
|
logger.warning("Unknown Source Type: %s", item.source.type)
|
||||||
except:
|
except:
|
||||||
traceback.print_exc()
|
logger.error("Error processing item: %s", item)
|
||||||
print("Error processing item:", item)
|
logger.error(traceback.format_exc())
|
||||||
new_item = None
|
new_item = None
|
||||||
|
|
||||||
if new_items:
|
if new_items:
|
||||||
for new_item in new_items:
|
for new_item in new_items:
|
||||||
print("->", new_item)
|
new_item: CSItem = new_item
|
||||||
|
logger.debug(f"-> {new_item.title} ({new_item.url})")
|
||||||
res.append(new_item)
|
res.append(new_item)
|
||||||
|
|
||||||
|
|
||||||
return res
|
return res
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
60
src/logger.py
Normal file
60
src/logger.py
Normal file
@@ -0,0 +1,60 @@
|
|||||||
|
# Source - https://stackoverflow.com/a
|
||||||
|
# Posted by Chris, modified by community. See post 'Timeline' for change history
|
||||||
|
# Retrieved 2026-01-24, License - CC BY-SA 4.0
|
||||||
|
import logging
|
||||||
|
|
||||||
|
LOGGING_CONFIG = {
|
||||||
|
'version': 1,
|
||||||
|
'disable_existing_loggers': True,
|
||||||
|
'formatters': {
|
||||||
|
'standard': {
|
||||||
|
'format': '%(asctime)s [%(levelname)-7s][%(name)-24s]: %(message)s'
|
||||||
|
},
|
||||||
|
},
|
||||||
|
'handlers': {
|
||||||
|
'default': {
|
||||||
|
'formatter': 'standard',
|
||||||
|
'class': 'logging.StreamHandler',
|
||||||
|
'stream': 'ext://sys.stdout', # Default is stderr
|
||||||
|
},
|
||||||
|
'stream_handler': {
|
||||||
|
'formatter': 'standard',
|
||||||
|
'class': 'logging.StreamHandler',
|
||||||
|
'stream': 'ext://sys.stdout', # Default is stderr
|
||||||
|
},
|
||||||
|
'file_handler': {
|
||||||
|
'formatter': 'standard',
|
||||||
|
'class': 'logging.handlers.RotatingFileHandler',
|
||||||
|
'filename': 'app.log',
|
||||||
|
'maxBytes': 1024 * 1024 * 1, # = 1MB
|
||||||
|
'backupCount': 3,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
'loggers': {
|
||||||
|
'uvicorn': {
|
||||||
|
'handlers': ['default', 'file_handler'],
|
||||||
|
'level': 'TRACE',
|
||||||
|
'propagate': False
|
||||||
|
},
|
||||||
|
'uvicorn.access': {
|
||||||
|
'handlers': ['stream_handler', 'file_handler'],
|
||||||
|
'level': 'TRACE',
|
||||||
|
'propagate': False
|
||||||
|
},
|
||||||
|
'uvicorn.error': {
|
||||||
|
'handlers': ['stream_handler', 'file_handler'],
|
||||||
|
'level': 'TRACE',
|
||||||
|
'propagate': False
|
||||||
|
},
|
||||||
|
'uvicorn.asgi': {
|
||||||
|
'handlers': ['stream_handler', 'file_handler'],
|
||||||
|
'level': 'TRACE',
|
||||||
|
'propagate': False
|
||||||
|
},
|
||||||
|
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
def get_worker_thread_logger() -> logging.Logger:
|
||||||
|
logger = logging.getLogger("uvicorn").getChild("build_thread")
|
||||||
|
return logger
|
||||||
67
src/main.py
67
src/main.py
@@ -1,10 +1,13 @@
|
|||||||
from fastapi import FastAPI, HTTPException, Depends
|
from fastapi import FastAPI
|
||||||
from fastapi.security import HTTPBasic, HTTPBasicCredentials
|
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
import queue
|
import janus
|
||||||
import asyncio
|
import threading
|
||||||
|
import traceback
|
||||||
|
from logging import getLogger
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
import os
|
|
||||||
|
from logger import LOGGING_CONFIG, get_worker_thread_logger
|
||||||
|
from config import load_settings
|
||||||
|
|
||||||
from build import build as run_build
|
from build import build as run_build
|
||||||
|
|
||||||
@@ -13,48 +16,58 @@ class TriggerRequest(BaseModel):
|
|||||||
items: list[str]
|
items: list[str]
|
||||||
|
|
||||||
|
|
||||||
build_queue: asyncio.Queue = None
|
build_queue: janus.Queue
|
||||||
|
build_queue_sync: janus.SyncQueue
|
||||||
|
build_queue_async: janus.AsyncQueue
|
||||||
|
|
||||||
|
def worker():
|
||||||
|
logger = get_worker_thread_logger()
|
||||||
|
logger.info("Build queue thread started")
|
||||||
|
error_counter = 0
|
||||||
|
while error_counter < 100:
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
selected = build_queue_sync.get()
|
||||||
|
logger.info(f"Processing build request for: {selected}")
|
||||||
|
run_build(trigger_list=selected)
|
||||||
|
except:
|
||||||
|
traceback.print_exc()
|
||||||
|
error_counter += 1
|
||||||
|
|
||||||
async def worker():
|
|
||||||
print("Build queue thread started")
|
|
||||||
while True:
|
|
||||||
selected = await build_queue.get()
|
|
||||||
print("Processing build request for:", selected)
|
|
||||||
await run_build(trigger_list=selected)
|
|
||||||
|
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def lifespan(app: FastAPI):
|
async def lifespan(app: FastAPI):
|
||||||
global build_queue
|
global build_queue, build_queue_sync, build_queue_async
|
||||||
build_queue = asyncio.Queue()
|
settings = load_settings()
|
||||||
task = asyncio.create_task(worker())
|
|
||||||
|
|
||||||
try:
|
logger = getLogger("uvicorn").getChild("lifespan")
|
||||||
yield
|
|
||||||
finally:
|
|
||||||
task.cancel()
|
|
||||||
|
|
||||||
try:
|
build_queue = janus.Queue()
|
||||||
await task
|
build_queue_sync = build_queue.sync_q
|
||||||
except asyncio.CancelledError:
|
build_queue_async = build_queue.async_q
|
||||||
pass
|
|
||||||
|
t = threading.Thread(target=worker)
|
||||||
|
t.daemon = True
|
||||||
|
t.start()
|
||||||
|
|
||||||
|
yield
|
||||||
|
|
||||||
app = FastAPI(title="FSSquared Trigger API", lifespan=lifespan)
|
app = FastAPI(title="FSSquared Trigger API", lifespan=lifespan)
|
||||||
|
|
||||||
|
|
||||||
@app.post("/trigger")
|
@app.post("/trigger")
|
||||||
async def trigger(payload: TriggerRequest):
|
async def trigger(payload: TriggerRequest):
|
||||||
build_queue.put(payload.items)
|
await build_queue_async.put(payload.items)
|
||||||
return {"status": "ok", "requested": payload.items}
|
return {"status": "ok", "requested": payload.items}
|
||||||
|
|
||||||
|
|
||||||
@app.post("/trigger/all")
|
@app.post("/trigger/all")
|
||||||
async def trigger_all():
|
async def trigger_all():
|
||||||
await build_queue.put(None)
|
await build_queue_async.put(None)
|
||||||
return {"status": "ok", "requested": "all"}
|
return {"status": "ok", "requested": "all"}
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
import uvicorn
|
import uvicorn
|
||||||
uvicorn.run(app, host="0.0.0.0", port=8000, log_level="debug")
|
uvicorn.run(app, host="0.0.0.0", port=8000, log_config=LOGGING_CONFIG)
|
||||||
|
|
||||||
|
|
||||||
@@ -5,10 +5,10 @@ import httpx
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
|
|
||||||
async def process_gitea(item: CSInventoryItem, outdir: str) -> list[CSItem] | None:
|
def process_gitea(item: CSInventoryItem, outdir: str) -> list[CSItem] | None:
|
||||||
source: CSSourceGitea = item.source
|
source: CSSourceGitea = item.source
|
||||||
commit_hash = await get_release_commit_sha(source.base_url, source.owner, source.repo, source.tag)
|
commit_hash = get_release_commit_sha(source.base_url, source.owner, source.repo, source.tag)
|
||||||
asserts = await list_release_assets(source.base_url, source.owner, source.repo, source.tag)
|
asserts = list_release_assets(source.base_url, source.owner, source.repo, source.tag)
|
||||||
|
|
||||||
asserts = list(filter(lambda a: a[1].endswith(".pdf"), asserts))
|
asserts = list(filter(lambda a: a[1].endswith(".pdf"), asserts))
|
||||||
asserts = list(map(lambda a: (a[0], f"{source.base_url}/repos/{source.owner}/{source.repo}/releases/download/{source.tag}/{a[0]}"), asserts))
|
asserts = list(map(lambda a: (a[0], f"{source.base_url}/repos/{source.owner}/{source.repo}/releases/download/{source.tag}/{a[0]}"), asserts))
|
||||||
@@ -21,7 +21,7 @@ async def process_gitea(item: CSInventoryItem, outdir: str) -> list[CSItem] | No
|
|||||||
res_url = a[0]
|
res_url = a[0]
|
||||||
|
|
||||||
if item.cache:
|
if item.cache:
|
||||||
cache_url = await cache_cheatsheet(a[0], outdir)
|
cache_url = cache_cheatsheet(a[0], outdir)
|
||||||
if cache_url:
|
if cache_url:
|
||||||
res_url = cache_url
|
res_url = cache_url
|
||||||
else:
|
else:
|
||||||
@@ -42,7 +42,7 @@ async def process_gitea(item: CSInventoryItem, outdir: str) -> list[CSItem] | No
|
|||||||
|
|
||||||
return res
|
return res
|
||||||
|
|
||||||
async def get_release_commit_sha(base_url, owner, repo, tag_name, token=None):
|
def get_release_commit_sha(base_url, owner, repo, tag_name, token=None):
|
||||||
"""
|
"""
|
||||||
Resolve the commit SHA for a Gitea release tag.
|
Resolve the commit SHA for a Gitea release tag.
|
||||||
|
|
||||||
@@ -55,14 +55,14 @@ async def get_release_commit_sha(base_url, owner, repo, tag_name, token=None):
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
async with httpx.AsyncClient() as client:
|
with httpx.Client() as client:
|
||||||
headers = {}
|
headers = {}
|
||||||
if token:
|
if token:
|
||||||
headers["Authorization"] = f"token {token}"
|
headers["Authorization"] = f"token {token}"
|
||||||
|
|
||||||
# 1) List tags and find the matching tag
|
# 1) List tags and find the matching tag
|
||||||
tags_url = f"{base_url}/api/v1/repos/{owner}/{repo}/tags"
|
tags_url = f"{base_url}/api/v1/repos/{owner}/{repo}/tags"
|
||||||
resp = await client.get(tags_url, headers=headers)
|
resp = client.get(tags_url, headers=headers)
|
||||||
resp.raise_for_status()
|
resp.raise_for_status()
|
||||||
tags = resp.json()
|
tags = resp.json()
|
||||||
|
|
||||||
@@ -83,7 +83,7 @@ async def get_release_commit_sha(base_url, owner, repo, tag_name, token=None):
|
|||||||
raise RuntimeError("Tag object SHA missing; cannot dereference annotated tag")
|
raise RuntimeError("Tag object SHA missing; cannot dereference annotated tag")
|
||||||
|
|
||||||
git_tag_url = f"{base_url}/api/v1/repos/{owner}/{repo}/git/tags/{tag_obj_sha}"
|
git_tag_url = f"{base_url}/api/v1/repos/{owner}/{repo}/git/tags/{tag_obj_sha}"
|
||||||
resp = await client.get(git_tag_url, headers=headers)
|
resp = client.get(git_tag_url, headers=headers)
|
||||||
resp.raise_for_status()
|
resp.raise_for_status()
|
||||||
annotated = resp.json()
|
annotated = resp.json()
|
||||||
|
|
||||||
@@ -95,7 +95,7 @@ async def get_release_commit_sha(base_url, owner, repo, tag_name, token=None):
|
|||||||
return target.get("sha")
|
return target.get("sha")
|
||||||
|
|
||||||
|
|
||||||
async def list_release_assets(base_url, owner, repo, tag, token=None):
|
def list_release_assets(base_url, owner, repo, tag, token=None):
|
||||||
"""
|
"""
|
||||||
Return a list of (download_url, filename) for all assets of a Gitea release.
|
Return a list of (download_url, filename) for all assets of a Gitea release.
|
||||||
|
|
||||||
@@ -107,14 +107,14 @@ async def list_release_assets(base_url, owner, repo, tag, token=None):
|
|||||||
:returns: list of (download_url, filename) tuples
|
:returns: list of (download_url, filename) tuples
|
||||||
"""
|
"""
|
||||||
|
|
||||||
async with httpx.AsyncClient() as client:
|
with httpx.Client() as client:
|
||||||
headers = {}
|
headers = {}
|
||||||
if token:
|
if token:
|
||||||
headers["Authorization"] = f"token {token}"
|
headers["Authorization"] = f"token {token}"
|
||||||
|
|
||||||
# 1) Get release by tag
|
# 1) Get release by tag
|
||||||
rel_url = f"{base_url}/api/v1/repos/{owner}/{repo}/releases/tags/{tag}"
|
rel_url = f"{base_url}/api/v1/repos/{owner}/{repo}/releases/tags/{tag}"
|
||||||
rel_resp = await client.get(rel_url, headers=headers)
|
rel_resp = client.get(rel_url, headers=headers)
|
||||||
rel_resp.raise_for_status()
|
rel_resp.raise_for_status()
|
||||||
release: dict = rel_resp.json()
|
release: dict = rel_resp.json()
|
||||||
|
|
||||||
|
|||||||
@@ -1,4 +1,3 @@
|
|||||||
import hashlib
|
|
||||||
import httpx
|
import httpx
|
||||||
import datetime
|
import datetime
|
||||||
import os
|
import os
|
||||||
@@ -9,13 +8,13 @@ def get_datestring() -> str:
|
|||||||
return datetime.datetime.now().strftime("%d.%m.%y")
|
return datetime.datetime.now().strftime("%d.%m.%y")
|
||||||
|
|
||||||
|
|
||||||
async def cache_cheatsheet(url, outdir: str) -> str | None:
|
def cache_cheatsheet(url, outdir: str) -> str | None:
|
||||||
|
|
||||||
print("Caching cheatsheet from", url)
|
print("Caching cheatsheet from", url)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
async with httpx.AsyncClient() as client:
|
with httpx.Client() as client:
|
||||||
r = await client.get(url, timeout=5.0)
|
r = client.get(url, timeout=5.0)
|
||||||
if not r.is_success and r.headers.get("Content-Type") != "application/pdf":
|
if not r.is_success and r.headers.get("Content-Type") != "application/pdf":
|
||||||
return None
|
return None
|
||||||
except httpx.TimeoutException:
|
except httpx.TimeoutException:
|
||||||
|
|||||||
Reference in New Issue
Block a user