Spaces:
Running
Running
| # api/logger.py | |
| # Two-layer durable logging strategy | |
| # | |
| # Layer 1: Local JSONL (fast write, ephemeral β wiped on HF Space restart) | |
| # Used by Evidently drift scripts | |
| # Layer 2: HF Dataset API push (durable, survives restarts) | |
| # Called as FastAPI BackgroundTask β never blocks response | |
| # | |
| # If HF push fails: user unaffected, local log still written | |
| import os | |
| import json | |
| import time | |
| from datetime import datetime, timezone | |
| from huggingface_hub import HfApi | |
| HF_REPO_ID = os.environ.get("HF_LOG_REPO", "CaffeinatedCoding/anomalyos-logs") | |
| LOCAL_LOG_DIR = "logs" | |
| LOCAL_LOG_PATH = os.path.join(LOCAL_LOG_DIR, "inference.jsonl") | |
| _hf_api: HfApi = None | |
| _hf_push_failure_count: int = 0 | |
| def init_logger(hf_token: str): | |
| """Called once at FastAPI startup.""" | |
| global _hf_api | |
| os.makedirs(LOCAL_LOG_DIR, exist_ok=True) | |
| if hf_token: | |
| _hf_api = HfApi(token=hf_token) | |
| print(f"Logger initialised | HF repo: {HF_REPO_ID}") | |
| else: | |
| print("WARNING: HF_TOKEN not set β only local logging active") | |
| def log_inference(record: dict): | |
| """ | |
| Layer 1: write to local JSONL synchronously. | |
| Called as BackgroundTask from FastAPI β does not block response. | |
| """ | |
| global _hf_push_failure_count | |
| # Ensure timestamp | |
| if "timestamp" not in record: | |
| record["timestamp"] = datetime.now(timezone.utc).isoformat() | |
| # ββ Layer 1: Local JSONL ββββββββββββββββββββββββββββββββββ | |
| try: | |
| with open(LOCAL_LOG_PATH, "a") as f: | |
| f.write(json.dumps(record) + "\n") | |
| except Exception as e: | |
| print(f"Local log write failed: {e}") | |
| # ββ Layer 2: HF Dataset push βββββββββββββββββββββββββββββ | |
| if _hf_api is None: | |
| return | |
| try: | |
| ts = record.get("timestamp", datetime.now(timezone.utc).isoformat()) | |
| # Sanitise timestamp for filename | |
| ts_safe = ts.replace(":", "-").replace(".", "-")[:26] | |
| path_in_repo = f"inference_logs/{ts_safe}_{record.get('image_hash', 'unknown')[:8]}.json" | |
| _hf_api.upload_file( | |
| path_or_fileobj=json.dumps(record, indent=2).encode("utf-8"), | |
| path_in_repo=path_in_repo, | |
| repo_id=HF_REPO_ID, | |
| repo_type="dataset" | |
| ) | |
| except Exception as e: | |
| _hf_push_failure_count += 1 | |
| print(f"HF Dataset push failed (count={_hf_push_failure_count}): {e}") | |
| # User response is completely unaffected β local log already written | |
| def log_arena_submission(record: dict): | |
| """Log Arena Mode submissions to shared leaderboard dataset.""" | |
| record["log_type"] = "arena" | |
| log_inference(record) | |
| def log_correction(record: dict): | |
| """Log user corrections from /correct/{case_id}.""" | |
| record["log_type"] = "correction" | |
| log_inference(record) | |
| def get_recent_logs(n: int = 200) -> list: | |
| """ | |
| Read last n records from local JSONL. | |
| Used by Evidently drift scripts. | |
| """ | |
| if not os.path.exists(LOCAL_LOG_PATH): | |
| return [] | |
| records = [] | |
| try: | |
| with open(LOCAL_LOG_PATH) as f: | |
| for line in f: | |
| line = line.strip() | |
| if line: | |
| records.append(json.loads(line)) | |
| except Exception as e: | |
| print(f"Error reading local log: {e}") | |
| return records[-n:] | |
| def get_push_failure_count() -> int: | |
| return _hf_push_failure_count |