Spaces:
Running on T4
Running on T4
| """Minimal FastAPI server for sandbox operations.""" | |
| import os, subprocess, pathlib | |
| from fastapi import FastAPI | |
| from pydantic import BaseModel | |
| from typing import Optional | |
| import uvicorn | |
| app = FastAPI() | |
| class BashReq(BaseModel): | |
| command: str | |
| work_dir: str = "/app" | |
| timeout: int = 120 | |
| class ReadReq(BaseModel): | |
| path: str | |
| offset: Optional[int] = None | |
| limit: Optional[int] = 2000 | |
| class WriteReq(BaseModel): | |
| path: str | |
| content: str | |
| class EditReq(BaseModel): | |
| path: str | |
| old_str: str | |
| new_str: str | |
| replace_all: bool = False | |
| class ExistsReq(BaseModel): | |
| path: str | |
| def health(): | |
| return {"status": "ok"} | |
| def bash(req: BashReq): | |
| try: | |
| r = subprocess.run( | |
| req.command, shell=True, capture_output=True, text=True, | |
| cwd=req.work_dir, timeout=req.timeout, | |
| ) | |
| output = r.stdout + r.stderr | |
| if len(output) > 30000: | |
| output = output[:30000] + "\n... (truncated)" | |
| return {"success": r.returncode == 0, "output": output, "error": "" if r.returncode == 0 else f"Exit code {r.returncode}"} | |
| except subprocess.TimeoutExpired: | |
| return {"success": False, "output": "", "error": f"Timeout after {req.timeout}s"} | |
| except Exception as e: | |
| return {"success": False, "output": "", "error": str(e)} | |
| def read(req: ReadReq): | |
| try: | |
| p = pathlib.Path(req.path) | |
| if not p.exists(): | |
| return {"success": False, "output": "", "error": f"File not found: {req.path}"} | |
| if p.is_dir(): | |
| return {"success": False, "output": "", "error": f"Is a directory: {req.path}"} | |
| lines = p.read_text().splitlines() | |
| start = (req.offset or 1) - 1 | |
| end = start + (req.limit or len(lines)) | |
| selected = lines[start:end] | |
| numbered = "\n".join(f"{start + i + 1}\t{line}" for i, line in enumerate(selected)) | |
| return {"success": True, "output": numbered, "error": ""} | |
| except Exception as e: | |
| return {"success": False, "output": "", "error": str(e)} | |
| def write(req: WriteReq): | |
| try: | |
| p = pathlib.Path(req.path) | |
| p.parent.mkdir(parents=True, exist_ok=True) | |
| p.write_text(req.content) | |
| return {"success": True, "output": f"Wrote {len(req.content)} bytes to {req.path}", "error": ""} | |
| except Exception as e: | |
| return {"success": False, "output": "", "error": str(e)} | |
| def edit(req: EditReq): | |
| try: | |
| p = pathlib.Path(req.path) | |
| if not p.exists(): | |
| return {"success": False, "output": "", "error": f"File not found: {req.path}"} | |
| content = p.read_text() | |
| if req.old_str not in content: | |
| return {"success": False, "output": "", "error": f"old_str not found in {req.path}"} | |
| if not req.replace_all and content.count(req.old_str) > 1: | |
| return {"success": False, "output": "", "error": f"old_str appears {content.count(req.old_str)} times. Use replace_all=true or provide more context."} | |
| if req.replace_all: | |
| new_content = content.replace(req.old_str, req.new_str) | |
| else: | |
| new_content = content.replace(req.old_str, req.new_str, 1) | |
| p.write_text(new_content) | |
| return {"success": True, "output": f"Edited {req.path}", "error": ""} | |
| except Exception as e: | |
| return {"success": False, "output": "", "error": str(e)} | |
| def exists(req: ExistsReq): | |
| return {"success": True, "output": str(pathlib.Path(req.path).exists()).lower(), "error": ""} | |
| if __name__ == "__main__": | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |