| """ |
| server.py |
| |
| Runs a simulation between AI datacenter workloads and an electrical grid (IEEE 13-bus OpenDSS model). |
| |
| Uses GPU power traces and workloads to model howAI inference/training affects grid voltage and stability over time. |
| """ |
|
|
|
|
|
|
| from fractions import Fraction |
| from pathlib import Path |
| import subprocess, tempfile, os, uvicorn, threading, math, json, hashlib |
|
|
| import pandas as pd |
| from fastapi import FastAPI, HTTPException, Response |
| from fastapi.middleware.cors import CORSMiddleware |
| from pydantic import BaseModel |
| from typing import Optional |
| from fastapi import WebSocket, WebSocketDisconnect |
|
|
|
|
| from openg2g.coordinator import Coordinator |
|
|
| from openg2g.datacenter.config import ( |
| DatacenterConfig, |
| InferenceModelSpec, |
| PowerAugmentationConfig, |
| TrainingRun, |
| ReplicaSchedule, |
| ) |
|
|
| from openg2g.datacenter.offline import OfflineDatacenter, OfflineWorkload |
| from openg2g.datacenter.workloads.inference import InferenceData |
| from openg2g.datacenter.workloads.training import TrainingTrace, TrainingTraceParams |
| from openg2g.grid.opendss import OpenDSSGrid |
| from openg2g.grid.config import TapPosition |
| from openg2g.controller.tap_schedule import TapScheduleController |
| from openg2g.metrics.voltage import compute_allbus_voltage_stats |
|
|
| import logging |
|
|
| logging.basicConfig( |
| level=logging.INFO, |
| format="%(asctime)s %(levelname)s [%(name)s] %(message)s", |
| ) |
| logger = logging.getLogger(__name__) |
|
|
|
|
|
|
| import asyncio, time |
| from concurrent.futures import ProcessPoolExecutor |
|
|
| import json |
|
|
|
|
| |
| _pool = ProcessPoolExecutor(max_workers=2) |
|
|
| _start_time = time.time() |
|
|
|
|
|
|
| DSS_DIR = Path(__file__).parent / "examples/ieee13" |
| DSS_MASTER = "IEEE13Nodeckt.dss" |
| CONFIG_PATH = Path(__file__).parent / "examples/offline/config.json" |
|
|
|
|
| |
| BUS_INDEX_TO_NAME = { |
| 1:"650", 2:"632", 3:"633", 4:"645", 5:"646", 6:"671", |
| 7:"684", 8:"611", 9:"634", 10:"675", 11:"652", 12:"680", 13:"692", |
| } |
| BUSES_ORDERED = [BUS_INDEX_TO_NAME[i] for i in range(1, 14)] |
|
|
|
|
| |
| _config_raw = json.loads(CONFIG_PATH.read_text()) |
| _MODELS = tuple(InferenceModelSpec(**m) for m in _config_raw["models"]) |
| _DC_CONFIG = DatacenterConfig(gpus_per_server=8, base_kw_per_phase=500.0) |
|
|
|
|
| if _config_raw.get("data_dir"): |
| _DATA_DIR = Path(_config_raw["data_dir"]) |
| else: |
| _DATA_DIR = Path(__file__).parent / "data/specs" |
| |
| |
| |
| _TRACES_SUMMARY_PATH = _DATA_DIR / "traces_summary.csv" |
|
|
| |
| _traces_df: pd.DataFrame | None = None |
|
|
|
|
| TAP_STEP = 0.00625 |
| INITIAL_TAPS = TapPosition( |
| a=1.0 + 14 * TAP_STEP, |
| b=1.0 + 6 * TAP_STEP, |
| c=1.0 + 15 * TAP_STEP, |
| ) |
|
|
| |
| TAP_CHANGE_SCHEDULE = ( |
| TapPosition( |
| a=1.0 + 16 * TAP_STEP, |
| b=1.0 + 6 * TAP_STEP, |
| c=1.0 + 17 * TAP_STEP, |
| ).at(t=75) |
| | TapPosition( |
| a=1.0 + 10 * TAP_STEP, |
| b=1.0 + 6 * TAP_STEP, |
| c=1.0 + 10 * TAP_STEP, |
| ).at(t=200) |
| ) |
|
|
|
|
| """ |
| Load trace index CSV and cache it. |
| """ |
| def _load_traces_index() -> pd.DataFrame: |
| global _traces_df |
| if _traces_df is None: |
| if _TRACES_SUMMARY_PATH.exists(): |
| _traces_df = pd.read_csv(_TRACES_SUMMARY_PATH) |
| else: |
| _traces_df = pd.DataFrame(columns=["model_label","num_gpus","max_num_seqs","trace_file"]) |
| return _traces_df |
|
|
|
|
| """ |
| Lookup GPU power trace and scale by replica count. |
| Returns a list of per-timestep total power values in watts. |
| |
| """ |
| def _get_trace_power(model_label: str, num_gpus: int, max_num_seqs: int, |
| num_replicas: int = 1) -> list[float]: |
| |
| df = _load_traces_index() |
| row = df[ |
| (df["model_label"] == model_label) & |
| (df["num_gpus"] == num_gpus) & |
| (df["max_num_seqs"]== max_num_seqs) |
| ] |
| if row.empty: |
| raise ValueError( |
| f"No trace found for model={model_label} num_gpus={num_gpus} " |
| f"max_num_seqs={max_num_seqs}. " |
| f"Available: {df[['model_label','num_gpus','max_num_seqs']].to_dict('records')}" |
| ) |
| trace_file = _DATA_DIR / row.iloc[0]["trace_file"] |
| trace_df = pd.read_csv(trace_file) |
| |
| power_W = trace_df["power_total_W"].tolist() |
| return [p * num_replicas for p in power_W] |
|
|
|
|
| logger.info(f"Data dir: {_DATA_DIR} exists={_DATA_DIR.exists()}") |
| _load_traces_index() |
|
|
|
|
| """Datacenter workload (baseline)""" |
| def _build_dc(scale: float = 1.0, duration_s: int = 300) -> OfflineDatacenter: |
|
|
| df = _load_traces_index() |
| first_row = df.iloc[0] |
| |
| first_model = tuple(m for m in _MODELS if m.model_label == first_row["model_label"]) |
| inference_data = InferenceData.load(_DATA_DIR, first_model) |
|
|
| training_trace = TrainingTrace.ensure( |
| _DATA_DIR / "training_trace.csv", |
| TrainingTraceParams(), |
| ) |
|
|
| t0 = min(40.0, duration_s * 0.13) |
| t1 = min(140.0, duration_s * 0.47) |
|
|
| replica_schedules = {} |
|
|
| for m in _MODELS: |
|
|
| initial_replicas = max(1, int(scale * 8)) |
|
|
| reduced_replicas = max(1, int(initial_replicas * 0.25)) |
|
|
| replica_schedules[m.model_label] = ( |
| ReplicaSchedule(initial=initial_replicas) |
| .ramp_to( |
| reduced_replicas, |
| t_start=min(150.0, duration_s * 0.50), |
| t_end=min(220.0, duration_s * 0.73), |
| ) |
| ) |
|
|
| workload = OfflineWorkload( |
| inference_data=inference_data, |
| replica_schedules=replica_schedules, |
|
|
| training=TrainingRun( |
| n_gpus=max(1, int(24 * scale)), |
| trace=training_trace, |
| target_peak_W_per_gpu=400.0, |
| ).at( |
| t_start=t0, |
| t_end=t1, |
| ), |
| ) |
|
|
| return OfflineDatacenter( |
| _DC_CONFIG, |
| workload, |
| dt_s=Fraction(1, 10), |
| seed=0, |
| name="baseline", |
| total_gpu_capacity=1000, |
| power_augmentation=PowerAugmentationConfig( |
| amplitude_scale_range=(0.88, 1.12), |
| noise_fraction=0.04, |
| ), |
| ) |
|
|
|
|
| """ |
| Build datacenter workload from GPU trace. |
| Returns (datacenter, raw_power_W_list) |
| |
| """ |
| def _build_dc_from_real_trace( |
| model_label: str, |
| num_gpus: int, |
| max_num_seqs: int, |
| num_replicas: int, |
| duration_s: int, |
| ) -> tuple[OfflineDatacenter, list[float]]: |
|
|
| power_W = _get_trace_power(model_label, num_gpus, max_num_seqs, num_replicas) |
|
|
| target_steps = int(duration_s / 0.1) |
| if len(power_W) < target_steps: |
| repeats = math.ceil(target_steps / len(power_W)) |
| power_W = (power_W * repeats)[:target_steps] |
| else: |
| power_W = power_W[:target_steps] |
|
|
| df = _load_traces_index() |
| row = df[df["model_label"] == model_label].iloc[0] |
|
|
| model_tuple = tuple(m for m in _MODELS if m.model_label == model_label) |
| inference_data = InferenceData.load(_DATA_DIR, model_tuple) |
|
|
| workload = OfflineWorkload( |
| inference_data=inference_data, |
| replica_schedules={ |
| model_label: ReplicaSchedule(initial=num_replicas) |
| }, |
| ) |
|
|
| |
| actual_gpu_count = num_replicas * num_gpus |
| gpu_capacity = max(1000, actual_gpu_count * 2) |
|
|
| dc = OfflineDatacenter( |
| _DC_CONFIG, workload, dt_s=Fraction(1, 10), seed=0, |
| name=model_label.replace(".", "-"), |
| total_gpu_capacity=gpu_capacity, |
| power_augmentation=PowerAugmentationConfig( |
| amplitude_scale_range=(1.0, 1.0), |
| noise_fraction=0.0, |
| ), |
| ) |
| return dc, power_W |
|
|
| """Create IEEE 13-bus grid with datacenter connection.""" |
| def _build_grid(tap_pu: float, dc_bus: str) -> OpenDSSGrid: |
| return OpenDSSGrid( |
| dss_case_dir=str(DSS_DIR), |
| dss_master_file=DSS_MASTER, |
| dt_s=Fraction(1), |
| source_pu=tap_pu, |
| initial_tap_position=INITIAL_TAPS, |
| ) |
|
|
|
|
|
|
| def _make_tap(v: float): |
| return TapPosition(a=v, b=v, c=v).at(t=0) |
|
|
| """Run datacenter + grid simulation.""" |
| def _run(dc, grid, tap_pu, dc_bus, duration_s): |
| grid.attach_dc(dc, bus=dc_bus, connection_type="wye", power_factor=_DC_CONFIG.power_factor) |
| coord = Coordinator( |
| datacenters=[dc], |
| grid=grid, |
| controllers=[TapScheduleController( |
| schedule=TAP_CHANGE_SCHEDULE, |
| dt_s=Fraction(1) |
| )], |
| total_duration_s=duration_s, |
| ) |
| return coord.run() |
|
|
| """ |
| Runs one full simulation job (datacenter + grid) in a worker process |
| and returns results for the API. |
| """ |
| def _run_full(req_dict: dict) -> dict: |
|
|
|
|
| dc_bus = BUS_INDEX_TO_NAME.get(req_dict["targetBus"], "671") |
| replicas = max(1, req_dict["numReplicas"]) |
|
|
| dc, raw_power_W = _build_dc_from_real_trace( |
| model_label = req_dict["modelLabel"], |
| num_gpus = req_dict["numGpus"], |
| max_num_seqs = req_dict["maxNumSeqs"], |
| num_replicas = replicas, |
| duration_s = req_dict["durationS"], |
| ) |
| grid = _build_grid(req_dict["substationVoltage"], dc_bus) |
| log = _run(dc, grid, req_dict["substationVoltage"], dc_bus, req_dict["durationS"]) |
|
|
| step = max(1, req_dict["sampleInterval"]) |
| gs_sampled = log.grid_states[::step] |
| t_sampled = list(log.time_s[::step]) |
| dc_states = log.dc_states |
|
|
| results = [] |
| for i, (t, gs) in enumerate(zip(t_sampled, gs_sampled)): |
| vs = _voltages(gs) |
| dc_i = min(range(len(dc_states)), key=lambda j: abs(dc_states[j].time_s - t)) |
| ds = dc_states[dc_i] |
| kw = float((ds.power_w.a + ds.power_w.b + ds.power_w.c) / 1000) |
| |
| |
| if math.isnan(kw): kw = 0.0 |
| trace_idx = min(int(t / 0.1), len(raw_power_W) - 1) if raw_power_W else 0 |
| raw_kw = raw_power_W[trace_idx] / 1000.0 if raw_power_W else kw |
| results.append({ |
| "time": float(t), |
| "gpu_power_W": kw * 1000, |
| "gpu_power_kW": kw, |
| "gpu_power_raw_kW": raw_kw, |
| "gpu_reactive_kVAR": kw * 0.329, |
| "active_gpus": replicas * req_dict["numGpus"], |
| "voltages": vs, |
| "min_voltage": min(vs), |
| "max_voltage": max(vs), |
| "target_bus_voltage": vs[req_dict["targetBus"] - 1], |
| "total_load_kW": kw, |
| }) |
|
|
| return { |
| "numSamples": len(results), |
| "targetBus": req_dict["targetBus"], |
| "modelLabel": req_dict["modelLabel"], |
| "numGpus": req_dict["numGpus"], |
| |
| |
| "maxNumSeqs": req_dict["maxNumSeqs"], |
| "numReplicas": replicas, |
| "duration": float(max(r["time"] for r in results) if results else 0), |
| "minVoltage": float(min(r["min_voltage"] for r in results) if results else 1.0), |
| "maxVoltage": float(max(r["max_voltage"] for r in results) if results else 1.0), |
| "avgGpuPower": float(sum(r["gpu_power_W"] for r in results) / len(results) if results else 0), |
| "peakGpuPower": float(max(r["gpu_power_W"] for r in results) if results else 0), |
| "timeSeries": results, |
| } |
| |
| |
| |
|
|
| """Get per-bus voltage (worst phase per bus).""" |
| def _voltages(gs) -> list[float]: |
| result = [] |
| none_count = 0 |
| for name in BUSES_ORDERED: |
| try: |
| tp = gs.voltages[name] |
| vals = [float(v) for v in [tp.a, tp.b, tp.c] |
| if not math.isnan(float(v)) and 0.5 < float(v) < 1.5] |
| result.append(min(vals) if vals else None) |
| except Exception as e: |
| logger.debug(f"Bus {name} voltage unavailable: {e}") |
| result.append(None) |
| |
| none_count = sum(1 for v in result if v is None) |
| if none_count > 3: |
| logger.warning(f"OpenDSS convergence failure: {none_count}/13 buses returned None") |
| |
| known = [v for v in result if v is not None] |
| avg = sum(known) / len(known) if known else 1.0 |
| return [v if v is not None else avg for v in result] |
|
|
|
|
| |
|
|
| app = FastAPI() |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=False, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
|
|
|
|
| class PowerflowRequest(BaseModel): |
| substationVoltage: float = 1.05 |
| numBuses: int = 13 |
| baseVoltage: float = 4.16 |
| targetBus: int = 0 |
|
|
| class LLMImpactRequest(BaseModel): |
| targetBus: int = 9 |
| sampleInterval: int = 1 |
| substationVoltage: float = 1.05 |
| modelLabel: str = "Llama-3.1-8B" |
| numGpus: int = 1 |
| maxNumSeqs: int = 128 |
| numReplicas: int = 1 |
| durationS: int = 300 |
|
|
| class HeatmapRequest(BaseModel): |
| voltages: list[float] |
| dataCenterBus: Optional[int] = None |
|
|
|
|
| @app.get("/api/health") |
| def health(): |
| return {"status": "ok", "data_ready": _DATA_DIR.exists(), |
| "message": "gpu2grid OpenDSS server"} |
|
|
|
|
|
|
|
|
| |
| |
|
|
|
|
| """Return available traces""" |
| @app.get("/api/traces") |
| def list_traces(): |
| |
| df = _load_traces_index() |
| if df.empty: |
| return {"traces": [], "models": [], "trainingAvailable": False} |
|
|
| traces = df[["model_label","num_gpus","max_num_seqs"]].to_dict("records") |
|
|
| models = [] |
| for model_label, group in df.groupby("model_label"): |
| models.append({ |
| "modelLabel": model_label, |
| "numGpus": int(group["num_gpus"].iloc[0]), |
| "batchSizes": sorted(group["max_num_seqs"].tolist()), |
| }) |
|
|
| training_available = (_DATA_DIR / "training_trace.csv").exists() |
|
|
| return { |
| "traces": traces, |
| "models": models, |
| "trainingAvailable": training_available, |
| "dataDir": str(_DATA_DIR), |
| } |
|
|
|
|
| """Baseline grid simulation, no workload""" |
| @app.post("/api/powerflow") |
| async def powerflow(req: PowerflowRequest): |
| logger.info(f"Powerflow request v={req.substationVoltage}") |
| try: |
| dc = _build_dc(scale=0.001, duration_s=5) |
| grid = _build_grid(req.substationVoltage, "671") |
| log = _run(dc, grid, req.substationVoltage, "671", 5) |
| vs = _voltages(log.grid_states[-1]) |
| logger.info(f"Powerflow result min={min(vs):.4f} max={max(vs):.4f}") |
| return {"buses": [{"id": i+1, "voltage": v, "activePower": 0.0, |
| "reactivePower": 0.0} for i, v in enumerate(vs)], |
| "lines": []} |
| except Exception as e: |
| logger.exception("Powerflow failed") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
|
| """Simulate AI workload impact on grid using GPU traces.""" |
| @app.post("/api/llm-impact") |
| async def llm_impact(req: LLMImpactRequest): |
| logger.info(f"Simulation request: {req.modelLabel} bus={req.targetBus}") |
|
|
| try: |
| loop = asyncio.get_event_loop() |
| result = await loop.run_in_executor(_pool, _run_full, req.dict()) |
| return result |
|
|
| except Exception as e: |
| logger.exception("Simulation failed") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
|
| @app.post("/api/heatmap") |
| async def heatmap(req: HeatmapRequest): |
| if len(req.voltages) != 13: |
| raise HTTPException(400, f"Need 13 voltages, got {len(req.voltages)}") |
| script = str(Path(__file__).parent / "generate_heatmap.py") |
| with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as f: |
| out = f.name |
| subprocess.run( |
| ["python3", script, out] + [str(v) for v in req.voltages] + |
| ([str(req.dataCenterBus)] if req.dataCenterBus else []), |
| check=True, |
| ) |
| png = open(out, "rb").read() |
| os.unlink(out) |
| return Response(content=png, media_type="image/png") |
|
|
|
|
| |
| |
| |
| |
| def _serialize_tick(tick, req_dict: dict, raw_power_W: list[float]) -> dict: |
| """Serialize one TickOutput to a small JSON-safe dict for the frontend.""" |
| t = tick.t_s |
|
|
| |
| voltages = None |
| min_v = max_v = target_v = None |
| if tick.grid_state is not None: |
| voltages = _voltages(tick.grid_state) |
| min_v = min(voltages) |
| max_v = max(voltages) |
| target_v = voltages[req_dict["targetBus"] - 1] |
|
|
| |
| kw = 0.0 |
| batch_by_model: dict[str, int] = {} |
| for dc_name, ds in tick.dc_states.items(): |
| pw = ds.power_w |
| kw += float((pw.a + pw.b + pw.c) / 1000) |
| if hasattr(ds, "batch_size_by_model"): |
| batch_by_model.update(ds.batch_size_by_model) |
| if math.isnan(kw): |
| kw = 0.0 |
|
|
| trace_idx = min(int(t / 0.1), len(raw_power_W) - 1) if raw_power_W else 0 |
| raw_kw = raw_power_W[trace_idx] / 1000.0 if raw_power_W else kw |
|
|
| events = [{"type": e.event_type, "data": e.data} for e in tick.sim_events] |
|
|
| return { |
| "time": float(t), |
| "gpu_power_kW": kw, |
| "gpu_power_raw_kW": raw_kw, |
| "active_gpus": req_dict["numReplicas"] * req_dict["numGpus"], |
| "batch_by_model": batch_by_model, |
| "voltages": voltages, |
| "min_voltage": min_v, |
| "max_voltage": max_v, |
| "target_bus_voltage": target_v, |
| "sim_events": events, |
| } |
|
|
|
|
| def _run_streaming(req_dict: dict): |
| """Generator: builds coordinator and yields serialized tick dicts.""" |
| dc_bus = BUS_INDEX_TO_NAME.get(req_dict["targetBus"], "671") |
| replicas = max(1, req_dict["numReplicas"]) |
|
|
| dc, raw_power_W = _build_dc_from_real_trace( |
| model_label = req_dict["modelLabel"], |
| num_gpus = req_dict["numGpus"], |
| max_num_seqs = req_dict["maxNumSeqs"], |
| num_replicas = replicas, |
| duration_s = req_dict["durationS"], |
| ) |
| grid = _build_grid(req_dict["substationVoltage"], dc_bus) |
| grid.attach_dc(dc, bus=dc_bus, connection_type="wye", |
| power_factor=_DC_CONFIG.power_factor) |
|
|
| coord = Coordinator( |
| datacenters=[dc], |
| grid=grid, |
| |
| controllers=[TapScheduleController( |
| schedule=TAP_CHANGE_SCHEDULE, |
| dt_s=Fraction(1), |
| )], |
| total_duration_s=req_dict["durationS"], |
| ) |
|
|
| step = max(1, req_dict["sampleInterval"]) |
| tick_num = 0 |
| try: |
| for tick in coord.run_iter(): |
| if tick_num % step == 0: |
| yield _serialize_tick(tick, req_dict, raw_power_W) |
| tick_num += 1 |
| finally: |
| coord.stop() |
|
|
| @app.websocket("/ws/sim-stream") |
| async def sim_stream(ws: WebSocket): |
| await ws.accept() |
| try: |
| req_dict = await ws.receive_json() |
| req = LLMImpactRequest(**req_dict) |
| logger.info(f"WS stream: {req.modelLabel} bus={req.targetBus}") |
|
|
| |
| loop = asyncio.get_event_loop() |
| result = await loop.run_in_executor(_pool, _run_full, req.dict()) |
|
|
| |
| for row in result["timeSeries"]: |
| await ws.send_json(row) |
|
|
| await ws.send_json({"done": True}) |
|
|
| except WebSocketDisconnect: |
| logger.info("WS client disconnected") |
| except Exception as e: |
| logger.exception("WS stream failed") |
| try: |
| await ws.send_json({"error": str(e)}) |
| except Exception: |
| pass |
| if __name__ == "__main__": |
| logger.info("=" * 70) |
| logger.info(f"Data dir: {_DATA_DIR} ready={_DATA_DIR.exists()}") |
| df = _load_traces_index() |
| if not df.empty: |
| models = df["model_label"].unique().tolist() |
| logger.info(f"Models: {models}") |
| logger.info(f"Traces: {len(df)} configurations") |
| logger.info("=" * 70) |
| uvicorn.run("server:app", host="0.0.0.0", port=8080, workers=1, log_level="info", ws_ping_interval=None) |
|
|