|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| import aiosqlite |
| import logging |
| import json |
| from datetime import datetime, timezone |
| from typing import Any, Dict, List, Optional |
|
|
| from . import config |
|
|
| logger = logging.getLogger("db_sync") |
|
|
| |
| |
| |
| _db_path: Optional[str] = None |
| _initialized: bool = False |
|
|
|
|
| |
| |
| |
|
|
| async def initialize() -> None: |
| global _db_path, _initialized |
|
|
| if _initialized: |
| return |
|
|
| db_cfg = config.get_db_sync() |
| raw_path = db_cfg.get("SQLITE_PATH", "app/.hub_state.db") |
| |
| |
| import os |
| if os.getenv("SPACE_ID"): |
| filename = os.path.basename(raw_path) |
| _db_path = f"/tmp/{filename}" |
| logger.info(f"HF Space detected β SQLite relocated to {_db_path}") |
| else: |
| _db_path = raw_path |
|
|
| await _init_tables() |
|
|
| _initialized = True |
| logger.info(f"db_sync initialized β path: {_db_path}") |
|
|
|
|
| |
| |
| |
|
|
| async def _init_tables() -> None: |
| """ |
| Creates app/* internal tables if they don't exist. |
| NEVER modifies Guardian tables (users, sessions). |
| """ |
| async with aiosqlite.connect(_db_path) as db: |
|
|
| |
| await db.execute(""" |
| CREATE TABLE IF NOT EXISTS hub_state ( |
| key TEXT PRIMARY KEY, |
| value TEXT, |
| updated_at TEXT |
| ) |
| """) |
|
|
| |
| await db.execute(""" |
| CREATE TABLE IF NOT EXISTS tool_cache ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| tool_name TEXT NOT NULL, |
| prompt TEXT NOT NULL, |
| response TEXT NOT NULL, |
| provider TEXT, |
| model TEXT, |
| created_at TEXT |
| ) |
| """) |
|
|
| await db.execute(""" |
| CREATE INDEX IF NOT EXISTS idx_tool_cache_tool |
| ON tool_cache(tool_name) |
| """) |
|
|
| await db.commit() |
|
|
| logger.info("db_sync tables ready.") |
|
|
|
|
| |
| |
| |
|
|
| async def write(key: str, value: Any) -> None: |
| """ |
| Write a value to hub_state key/value store. |
| Value is JSON-serialized β supports dicts, lists, strings, numbers. |
| |
| Args: |
| key: Unique key string (e.g. 'scheduler.last_run'). |
| value: Any JSON-serializable value. |
| """ |
| _check_init() |
| now = datetime.now(timezone.utc).isoformat() |
|
|
| async with aiosqlite.connect(_db_path) as db: |
| await db.execute(""" |
| INSERT OR REPLACE INTO hub_state (key, value, updated_at) |
| VALUES (?, ?, ?) |
| """, (key, json.dumps(value), now)) |
| await db.commit() |
|
|
|
|
| async def read(key: str, default: Any = None) -> Any: |
| """ |
| Read a value from hub_state key/value store. |
| Returns default if key does not exist. |
| |
| Args: |
| key: Key string to look up. |
| default: Default value if key not found. Default: None. |
| |
| Returns: |
| Deserialized value, or default if not found. |
| """ |
| _check_init() |
|
|
| async with aiosqlite.connect(_db_path) as db: |
| cursor = await db.execute( |
| "SELECT value FROM hub_state WHERE key = ?", (key,) |
| ) |
| row = await cursor.fetchone() |
|
|
| if row is None: |
| return default |
|
|
| try: |
| return json.loads(row[0]) |
| except (json.JSONDecodeError, TypeError): |
| return row[0] |
|
|
|
|
| async def delete(key: str) -> None: |
| """ |
| Delete a key from hub_state. |
| |
| Args: |
| key: Key string to delete. |
| """ |
| _check_init() |
|
|
| async with aiosqlite.connect(_db_path) as db: |
| await db.execute("DELETE FROM hub_state WHERE key = ?", (key,)) |
| await db.commit() |
|
|
|
|
| |
| |
| |
|
|
| async def cache_write( |
| tool_name: str, |
| prompt: str, |
| response: str, |
| provider: str = None, |
| model: str = None, |
| ) -> None: |
| """ |
| Cache a tool response to reduce redundant API calls. |
| |
| Args: |
| tool_name: Tool name (e.g. 'llm_complete', 'web_search'). |
| prompt: The input prompt/query that was used. |
| response: The response to cache. |
| provider: Provider name used (optional). |
| model: Model name used (optional). |
| """ |
| _check_init() |
|
|
| db_cfg = config.get_db_sync() |
| max_entries = int(db_cfg.get("MAX_CACHE_ENTRIES", "1000")) |
| now = datetime.now(timezone.utc).isoformat() |
|
|
| async with aiosqlite.connect(_db_path) as db: |
| await db.execute(""" |
| INSERT INTO tool_cache (tool_name, prompt, response, provider, model, created_at) |
| VALUES (?, ?, ?, ?, ?, ?) |
| """, (tool_name, prompt, response, provider, model, now)) |
|
|
| |
| await db.execute(""" |
| DELETE FROM tool_cache WHERE id NOT IN ( |
| SELECT id FROM tool_cache ORDER BY created_at DESC LIMIT ? |
| ) |
| """, (max_entries,)) |
|
|
| await db.commit() |
|
|
|
|
| async def cache_read(tool_name: str, prompt: str) -> Optional[str]: |
| """ |
| Read a cached tool response. |
| Returns None if no cache entry exists. |
| |
| Args: |
| tool_name: Tool name to look up. |
| prompt: The exact prompt/query to match. |
| |
| Returns: |
| Cached response string, or None if not found. |
| """ |
| _check_init() |
|
|
| async with aiosqlite.connect(_db_path) as db: |
| cursor = await db.execute(""" |
| SELECT response FROM tool_cache |
| WHERE tool_name = ? AND prompt = ? |
| ORDER BY created_at DESC LIMIT 1 |
| """, (tool_name, prompt)) |
| row = await cursor.fetchone() |
|
|
| return row[0] if row else None |
|
|
|
|
| |
| |
| |
|
|
| async def query(sql: str) -> List[Dict]: |
| """ |
| Execute a read-only SELECT query on the internal hub state database. |
| Only SELECT statements are permitted β write operations are blocked. |
| Called by mcp.py db_query tool when db_sync.py is active. |
| |
| Args: |
| sql: SQL SELECT statement to execute. |
| |
| Returns: |
| List of result rows as dicts. |
| |
| Raises: |
| ValueError: If the query is not a SELECT statement. |
| """ |
| _check_init() |
|
|
| if not sql.strip().upper().startswith("SELECT"): |
| raise ValueError("Only SELECT queries are permitted in db_query tool.") |
|
|
| async with aiosqlite.connect(_db_path) as db: |
| db.row_factory = aiosqlite.Row |
| cursor = await db.execute(sql) |
| rows = await cursor.fetchall() |
| return [dict(r) for r in rows] |
|
|
|
|
| |
| |
| |
|
|
| def _check_init() -> None: |
| """Raise RuntimeError if db_sync was not initialized.""" |
| if not _initialized or not _db_path: |
| raise RuntimeError("db_sync not initialized β call initialize() first.") |
|
|
|
|
| def is_ready() -> bool: |
| """Returns True if db_sync is initialized and ready.""" |
| return _initialized and _db_path is not None |
|
|
|
|
| |
| |
| |
| _psql_writer = None |
|
|
|
|
| def set_psql_writer(writer_fn) -> None: |
| """ |
| Receives execute_secured_query callable from Guardian via app.py. |
| Called once in start_application() if db_service is available. |
| app/* never imports postgresql.py directly β this is the only bridge. |
| """ |
| global _psql_writer |
| _psql_writer = writer_fn |
| logger.info("PostgreSQL writer registered.") |
|
|
|
|
| async def persist(table: str, data: dict) -> None: |
| if not _psql_writer: |
| raise RuntimeError("No PostgreSQL writer β DATABASE_URL not configured.") |
|
|
| from datetime import datetime, timezone |
| now = datetime.now(timezone.utc).isoformat() |
|
|
| |
| |
| sql = f"INSERT INTO {table} (payload, created_at) VALUES ($1::jsonb, $2)" |
| await _psql_writer(sql, json.dumps(data), now) |
| logger.info(f"Persisted to PostgreSQL table '{table}'.") |
|
|
| |
| |
| |
|
|
| if __name__ == "__main__": |
| print("WARNING: Run via main.py β app.py, not directly.") |
|
|