saikyo-packages-src/saikyo-av-web/bin/saikyo-av-web

289 lines
9.4 KiB
Python

#!/usr/bin/env python3
import argparse
import datetime as _dt
import html
import json
import os
import pathlib
import re
import socketserver
import sys
import urllib.parse
from http import HTTPStatus
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
def _now_utc_iso() -> str:
return _dt.datetime.now(tz=_dt.timezone.utc).isoformat(timespec="seconds")
_SAFE_REPORT_ID_RE = re.compile(r"^[A-Za-z0-9._-]{1,128}$")
def _safe_report_id(report_id: str) -> str:
if not _SAFE_REPORT_ID_RE.match(report_id):
raise ValueError("invalid report id")
return report_id
def _read_json(path: pathlib.Path) -> dict:
with path.open("r", encoding="utf-8") as f:
return json.load(f)
def _list_reports(reports_dir: pathlib.Path) -> list[dict]:
items: list[dict] = []
if not reports_dir.exists():
return items
for p in sorted(reports_dir.glob("*.json"), key=lambda x: x.stat().st_mtime, reverse=True):
try:
data = _read_json(p)
except Exception:
data = {}
items.append(
{
"id": p.stem,
"file": p.name,
"mtime": int(p.stat().st_mtime),
"summary": data.get("summary") or "(no summary)",
"severity": data.get("severity") or "unknown",
"created_utc": data.get("created_utc") or None,
}
)
return items
def _read_report(reports_dir: pathlib.Path, report_id: str) -> dict:
report_id = _safe_report_id(report_id)
path = reports_dir / f"{report_id}.json"
if not path.exists():
raise FileNotFoundError(report_id)
return _read_json(path)
def _write_json(path: pathlib.Path, data: dict) -> None:
tmp = path.with_suffix(path.suffix + ".tmp")
tmp.parent.mkdir(parents=True, exist_ok=True)
with tmp.open("w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
f.write("\n")
os.replace(tmp, path)
def _append_audit(reports_dir: pathlib.Path, entry: dict) -> None:
audit_dir = reports_dir / "audit"
audit_dir.mkdir(parents=True, exist_ok=True)
day = _dt.datetime.now(tz=_dt.timezone.utc).strftime("%Y-%m-%d")
path = audit_dir / f"actions-{day}.jsonl"
with path.open("a", encoding="utf-8") as f:
f.write(json.dumps(entry, ensure_ascii=False) + "\n")
def _static_file(root: pathlib.Path, rel: str) -> pathlib.Path:
rel = rel.lstrip("/")
p = (root / rel).resolve()
if not str(p).startswith(str(root.resolve())):
raise ValueError("bad path")
return p
class Handler(BaseHTTPRequestHandler):
server_version = "SaikyoAvWeb/0.1"
def _send_json(self, status: int, obj: dict | list) -> None:
body = json.dumps(obj, ensure_ascii=False).encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def _send_text(self, status: int, text: str, content_type: str = "text/plain; charset=utf-8") -> None:
body = text.encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", content_type)
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def _read_body_json(self) -> dict:
length = int(self.headers.get("Content-Length", "0") or "0")
if length <= 0:
return {}
raw = self.rfile.read(length)
try:
return json.loads(raw.decode("utf-8"))
except Exception:
return {}
def do_GET(self) -> None: # noqa: N802
parsed = urllib.parse.urlparse(self.path)
path = parsed.path
if path == "/api/health":
self._send_json(200, {"status": "ok", "time_utc": _now_utc_iso()})
return
if path == "/api/reports":
reports = _list_reports(self.server.reports_dir)
self._send_json(200, {"reports": reports})
return
if path.startswith("/api/reports/"):
report_id = path.removeprefix("/api/reports/").strip("/")
try:
data = _read_report(self.server.reports_dir, report_id)
except FileNotFoundError:
self._send_json(404, {"error": "not_found"})
return
except ValueError:
self._send_json(400, {"error": "bad_request"})
return
self._send_json(200, {"id": report_id, "report": data})
return
# static
if path == "/":
path = "/index.html"
try:
p = _static_file(self.server.web_root, path)
except ValueError:
self._send_text(400, "bad request")
return
if not p.exists() or not p.is_file():
self._send_text(404, "not found")
return
ctype = "application/octet-stream"
if p.name.endswith(".html"):
ctype = "text/html; charset=utf-8"
elif p.name.endswith(".css"):
ctype = "text/css; charset=utf-8"
elif p.name.endswith(".js"):
ctype = "application/javascript; charset=utf-8"
elif p.name.endswith(".svg"):
ctype = "image/svg+xml"
try:
data = p.read_bytes()
except OSError:
self._send_text(500, "internal error")
return
self.send_response(200)
self.send_header("Content-Type", ctype)
self.send_header("Content-Length", str(len(data)))
self.end_headers()
self.wfile.write(data)
def do_POST(self) -> None: # noqa: N802
parsed = urllib.parse.urlparse(self.path)
path = parsed.path
if path == "/api/apply-fix":
body = self._read_body_json()
report_id = body.get("report_id")
fix_id = body.get("fix_id")
confirm = body.get("confirm")
if not report_id or not fix_id:
self._send_json(400, {"error": "missing_fields"})
return
# Consent gate: without explicit confirmation we do not attempt any fix.
if confirm is not True:
self._send_json(
409,
{
"error": "confirmation_required",
"message": "Fixes are applied only with explicit operator confirmation.",
},
)
return
# MVP: backend fixer not implemented yet.
entry = {
"time_utc": _now_utc_iso(),
"action": "apply_fix_requested",
"report_id": str(report_id),
"fix_id": str(fix_id),
"result": "not_implemented",
}
try:
_append_audit(self.server.reports_dir, entry)
except Exception:
pass
self._send_json(
501,
{
"error": "not_implemented",
"message": "Fix runner is not implemented in saikyo-av-web MVP. Use saikyo-av CLI once available.",
},
)
return
self._send_json(404, {"error": "not_found"})
class Server(ThreadingHTTPServer):
def __init__(self, server_address, RequestHandlerClass, reports_dir: pathlib.Path, web_root: pathlib.Path):
super().__init__(server_address, RequestHandlerClass)
self.reports_dir = reports_dir
self.web_root = web_root
def _ensure_sample_report(reports_dir: pathlib.Path) -> None:
# Create a small sample report only if the directory is empty.
try:
reports_dir.mkdir(parents=True, exist_ok=True)
if any(reports_dir.glob("*.json")):
return
except Exception:
return
sample = {
"created_utc": _now_utc_iso(),
"severity": "info",
"summary": "Saikyo Antivirus Web UI is installed (sample report)",
"details": {
"note": "This is a placeholder report. Real reports will appear here once saikyo-avd is installed and running.",
},
"suggested_fixes": [
{
"id": "collect_artifacts",
"title": "Collect diagnostics artifacts",
"description": "Run collect-artifacts.sh and attach the resulting archive to a support ticket.",
"requires_consent": True,
}
],
}
_write_json(reports_dir / "sample.json", sample)
def main() -> int:
parser = argparse.ArgumentParser(prog="saikyo-av-web")
parser.add_argument("--bind", default="127.0.0.1")
parser.add_argument("--port", type=int, default=8765)
parser.add_argument("--reports-dir", default="/var/lib/saikyo-av/reports")
parser.add_argument("--web-root", default="/usr/share/saikyo-av/web")
parser.add_argument("--no-sample", action="store_true")
args = parser.parse_args()
reports_dir = pathlib.Path(args.reports_dir)
web_root = pathlib.Path(args.web_root)
if not args.no_sample:
_ensure_sample_report(reports_dir)
httpd = Server((args.bind, args.port), Handler, reports_dir=reports_dir, web_root=web_root)
print(f"Listening on http://{args.bind}:{args.port}")
try:
httpd.serve_forever()
except KeyboardInterrupt:
return 0
if __name__ == "__main__":
raise SystemExit(main())