| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| import json |
| import logging |
| import httpx |
| from pathlib import Path |
| from typing import Optional |
|
|
| from . import config |
| from . import db_sync |
|
|
| logger = logging.getLogger("shellmaster") |
|
|
| |
| |
| |
| |
| _HARDCODED_BLOCKED = { |
| |
| "rm -rf", "rm -r /", "rm -f /", |
| "mkfs", "dd if=", "> /dev/sd", "> /dev/hd", |
| "shred /dev/", "wipefs", |
| |
| "format c:", "format d:", "format /fs", |
| "rmdir /s", "rd /s", "del /f /s /q c:\\", |
| |
| ":(){ :|:& };:", |
| "chmod -r 777 /", "chmod -r 000 /", "chmod 000 /", |
| "chown -r root /", "chown -r 0:0 /", |
| |
| "icacls c:\\ /grant", "icacls c:\\ /deny", |
| "takeown /f c:\\", |
| "reg delete hklm", "reg delete hkcu", |
| |
| "sudo su", "sudo -i", "sudo -s", |
| "su root", "su -", "passwd root", "visudo", |
| |
| "runas /user:administrator", |
| "net user administrator", |
| "net localgroup administrators", |
| "psexec -s", |
| |
| "/etc/passwd", "/etc/shadow", "/etc/sudoers", |
| "/etc/crontab", "/boot/", "/proc/sysrq", |
| |
| "c:\\windows\\system32\\", "c:\\windows\\system\\", |
| "hklm\\system", "hklm\\security", |
| |
| "reboot", "halt", "poweroff", |
| "init 0", "init 6", |
| "systemctl poweroff", "systemctl reboot", "systemctl halt", |
| |
| "shutdown /s", "shutdown /r", "shutdown /h", |
| |
| "iptables -f", "iptables --flush", "ufw disable", |
| "netsh firewall set opmode disable", |
| "netsh advfirewall set allprofiles state off", |
| } |
|
|
|
|
| def _is_hardblocked(cmd_str: str) -> bool: |
| """Check if command contains any hardcoded blocked pattern.""" |
| cmd_lower = cmd_str.lower().strip() |
| return any(blocked in cmd_lower for blocked in _HARDCODED_BLOCKED) |
|
|
|
|
| |
| |
| |
| _cfg = {} |
| _agent_url = "" |
| _token = "" |
| _cmd_file = Path("shellmaster_commands.jsonl") |
| _timeout = 30 |
| _initialized = False |
|
|
|
|
| |
| |
| |
|
|
| async def initialize() -> None: |
| """ |
| Initialize ShellMaster from app/.pyfun [TOOL.shellmaster]. |
| Called by app/app.py β never reads os.environ directly. |
| """ |
| global _cfg, _agent_url, _token, _cmd_file, _timeout, _initialized |
|
|
| tools_cfg = config.get_active_tools() |
| _cfg = tools_cfg.get("shellmaster", {}) |
|
|
| if not _cfg or _cfg.get("active", "false").lower() != "true": |
| logger.info("ShellMaster not active in .pyfun β skipped") |
| return |
|
|
| _agent_url = _cfg.get("shellmaster_agent_url", "http://localhost:5004") |
| _token = _cfg.get("shellmaster_token", "") |
| _timeout = int(_cfg.get("timeout_sec", "30")) |
|
|
| |
| cmd_file_path = _cfg.get("shellmaster_commands_file", "shellmaster_commands.jsonl") |
| _cmd_file = Path(cmd_file_path) |
|
|
| if not _token: |
| logger.warning("ShellMaster: shellmaster_token not set in .pyfun β disabled") |
| return |
|
|
| if not _cmd_file.exists(): |
| logger.warning(f"ShellMaster: commands file not found: {_cmd_file} β disabled") |
| return |
|
|
| allowed = sum( |
| 1 for line in _cmd_file.open() |
| if line.strip() and json.loads(line).get("allowed", False) |
| ) |
| logger.info(f"ShellMaster initialized | agent: {_agent_url} | allowed commands: {allowed}") |
| _initialized = True |
|
|
|
|
| |
| |
| |
|
|
| def _load_registry() -> dict: |
| """ |
| Load command registry from local JSONL. |
| Hot-reloaded on each call β no restart needed for registry changes. |
| TODO: merge with HF Dataset registry if shellmaster_commands_dataset is set. |
| """ |
| registry = {} |
| try: |
| with open(_cmd_file) as f: |
| for line in f: |
| line = line.strip() |
| if not line: |
| continue |
| cmd = json.loads(line) |
| registry[cmd["id"]] = cmd |
| except Exception as e: |
| logger.error(f"Registry load failed: {type(e).__name__}: {e}") |
| return registry |
|
|
|
|
| def _validate_command(cmd_str: str) -> tuple[bool, str]: |
| """ |
| Validate a command string against registry + hardblock list. |
| |
| Returns: |
| (True, command_id) if valid and allowed |
| (False, reason) if blocked |
| """ |
| |
| if _is_hardblocked(cmd_str): |
| return False, "blocked by system policy" |
|
|
| |
| registry = _load_registry() |
| cmd_lower = cmd_str.lower().strip() |
|
|
| for cmd_id, cmd in registry.items(): |
| |
| for os_key in ("unix", "win"): |
| template_base = cmd.get(os_key, "").split()[0].lower() |
| if template_base and cmd_lower.startswith(template_base): |
| if not cmd.get("allowed", False): |
| return False, f"command '{cmd_id}' is blocked in registry" |
| return True, cmd_id |
|
|
| return False, f"command not found in registry: {cmd_str[:50]}" |
|
|
|
|
| def list_commands(category: str = None) -> list: |
| """List allowed commands, optionally filtered by category.""" |
| cmds = [c for c in _load_registry().values() if c.get("allowed", False)] |
| if category: |
| cmds = [c for c in cmds if c.get("category") == category] |
| return cmds |
|
|
|
|
| |
| |
| |
|
|
| def build_confirmation(llm_result: dict) -> str: |
| """ |
| Build confirmation message from LLM-generated command plan. |
| Returns formatted string for user to confirm/deny. |
| """ |
| command = llm_result.get("command", "") |
| backup = llm_result.get("backup", "") |
| recovery = llm_result.get("recovery", "") |
| risk = llm_result.get("risk", "unknown").upper() |
|
|
| lines = [ |
| f"β οΈ ShellMaster β Confirmation Required", |
| f"", |
| f"Command: {command}", |
| f"Risk: {risk}", |
| ] |
|
|
| if backup: |
| lines.append(f"Backup: {backup}") |
| if recovery: |
| lines.append(f"Recovery: {recovery}") |
|
|
| lines += [ |
| f"", |
| f"Type 'confirm shellmaster' to execute or 'cancel' to abort.", |
| ] |
| return "\n".join(lines) |
|
|
|
|
| async def store_recovery_plan(command: str, backup: str, recovery: str) -> None: |
| """Store recovery plan in db_sync hub_state for later use.""" |
| try: |
| await db_sync.write("shellmaster.last_command", { |
| "command": command, |
| "backup": backup, |
| "recovery": recovery, |
| }) |
| logger.info("ShellMaster: recovery plan stored in db_sync") |
| except Exception as e: |
| logger.warning(f"ShellMaster: recovery plan store failed: {type(e).__name__}") |
|
|
|
|
| async def get_recovery_plan() -> dict: |
| """Retrieve last recovery plan from db_sync.""" |
| return await db_sync.read("shellmaster.last_command", default={}) |
|
|
|
|
| |
| |
| |
|
|
| async def execute_confirmed(command: str) -> str: |
| """ |
| Execute a command on the local agent. |
| ONLY called after explicit user confirmation β never auto-execute! |
| |
| Args: |
| command: Raw shell command string (already validated). |
| |
| Returns: |
| Command output or error message. |
| """ |
| if not _initialized: |
| return "ShellMaster not initialized." |
|
|
| |
| if _is_hardblocked(command): |
| logger.warning(f"HARDBLOCK on confirmed command: {command[:60]}") |
| return "Blocked by system policy β cannot execute." |
|
|
| try: |
| async with httpx.AsyncClient() as client: |
| r = await client.post( |
| f"{_agent_url}/command", |
| json={"command": command}, |
| headers={ |
| "Authorization": f"Bearer {_token}", |
| "Content-Type": "application/json", |
| }, |
| timeout=_timeout, |
| ) |
| r.raise_for_status() |
| data = r.json() |
| output = data.get("result", data.get("output", "")) |
| logger.info(f"ShellMaster executed: {command[:60]}") |
| return output |
|
|
| except httpx.ConnectError: |
| return f"ShellMaster Agent not reachable at {_agent_url} β is it running?" |
| except httpx.HTTPStatusError as e: |
| return f"Agent error: HTTP {e.response.status_code}" |
| except Exception as e: |
| logger.warning(f"ShellMaster execute failed: {type(e).__name__}: {e}") |
| return f"Error: {type(e).__name__}" |
|
|
|
|
| def is_ready() -> bool: |
| return _initialized |
|
|
|
|
| def get_config() -> dict: |
| return _cfg |