live / server.py
github-actions[bot]
deploy: sync from GitHub 2026-05-13T22:41:47Z
b598e06
"""
server.py
Runs a simulation between AI datacenter workloads and an electrical grid (IEEE 13-bus OpenDSS model).
Uses GPU power traces and workloads to model howAI inference/training affects grid voltage and stability over time.
"""
from fractions import Fraction
from pathlib import Path
import subprocess, tempfile, os, uvicorn, threading, math, json, hashlib
import pandas as pd
from fastapi import FastAPI, HTTPException, Response
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional
from fastapi import WebSocket, WebSocketDisconnect
from openg2g.coordinator import Coordinator
from openg2g.datacenter.config import (
DatacenterConfig,
InferenceModelSpec,
PowerAugmentationConfig,
TrainingRun,
ReplicaSchedule,
)
from openg2g.datacenter.offline import OfflineDatacenter, OfflineWorkload
from openg2g.datacenter.workloads.inference import InferenceData
from openg2g.datacenter.workloads.training import TrainingTrace, TrainingTraceParams
from openg2g.grid.opendss import OpenDSSGrid
from openg2g.grid.config import TapPosition
from openg2g.controller.tap_schedule import TapScheduleController
from openg2g.metrics.voltage import compute_allbus_voltage_stats
import logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s [%(name)s] %(message)s",
)
logger = logging.getLogger(__name__)
import asyncio, time
from concurrent.futures import ProcessPoolExecutor
import json
#currently set to 2 for free tier at hf
_pool = ProcessPoolExecutor(max_workers=2)
_start_time = time.time()
DSS_DIR = Path(__file__).parent / "examples/ieee13"
DSS_MASTER = "IEEE13Nodeckt.dss"
CONFIG_PATH = Path(__file__).parent / "examples/offline/config.json"
# Maps IEEE 13-bus indices to OpenDSS bus names
BUS_INDEX_TO_NAME = {
1:"650", 2:"632", 3:"633", 4:"645", 5:"646", 6:"671",
7:"684", 8:"611", 9:"634", 10:"675", 11:"652", 12:"680", 13:"692",
}
BUSES_ORDERED = [BUS_INDEX_TO_NAME[i] for i in range(1, 14)]
#read files
_config_raw = json.loads(CONFIG_PATH.read_text())
_MODELS = tuple(InferenceModelSpec(**m) for m in _config_raw["models"])
_DC_CONFIG = DatacenterConfig(gpus_per_server=8, base_kw_per_phase=500.0)
if _config_raw.get("data_dir"):
_DATA_DIR = Path(_config_raw["data_dir"])
else:
_DATA_DIR = Path(__file__).parent / "data/specs"
# Load traces_summary.csv once at startup so we can quickly look up trace files
_TRACES_SUMMARY_PATH = _DATA_DIR / "traces_summary.csv"
#Cached dataframe of available GPU power traces
_traces_df: pd.DataFrame | None = None
TAP_STEP = 0.00625
INITIAL_TAPS = TapPosition(
a=1.0 + 14 * TAP_STEP,
b=1.0 + 6 * TAP_STEP,
c=1.0 + 15 * TAP_STEP,
)
# Rescaled to fit in 300s window (original is 3600s, we compress ~12x)
TAP_CHANGE_SCHEDULE = (
TapPosition(
a=1.0 + 16 * TAP_STEP,
b=1.0 + 6 * TAP_STEP,
c=1.0 + 17 * TAP_STEP,
).at(t=75) # was 1500s → 75s at 12x compression
| TapPosition(
a=1.0 + 10 * TAP_STEP,
b=1.0 + 6 * TAP_STEP,
c=1.0 + 10 * TAP_STEP,
).at(t=200) # was 3300s → 200s at 12x compression
)
"""
Load trace index CSV and cache it.
"""
def _load_traces_index() -> pd.DataFrame:
global _traces_df
if _traces_df is None:
if _TRACES_SUMMARY_PATH.exists():
_traces_df = pd.read_csv(_TRACES_SUMMARY_PATH)
else:
_traces_df = pd.DataFrame(columns=["model_label","num_gpus","max_num_seqs","trace_file"])
return _traces_df
"""
Lookup GPU power trace and scale by replica count.
Returns a list of per-timestep total power values in watts.
"""
def _get_trace_power(model_label: str, num_gpus: int, max_num_seqs: int,
num_replicas: int = 1) -> list[float]:
df = _load_traces_index()
row = df[
(df["model_label"] == model_label) &
(df["num_gpus"] == num_gpus) &
(df["max_num_seqs"]== max_num_seqs)
]
if row.empty:
raise ValueError(
f"No trace found for model={model_label} num_gpus={num_gpus} "
f"max_num_seqs={max_num_seqs}. "
f"Available: {df[['model_label','num_gpus','max_num_seqs']].to_dict('records')}"
)
trace_file = _DATA_DIR / row.iloc[0]["trace_file"]
trace_df = pd.read_csv(trace_file)
power_W = trace_df["power_total_W"].tolist()
return [p * num_replicas for p in power_W]
logger.info(f"Data dir: {_DATA_DIR} exists={_DATA_DIR.exists()}")
_load_traces_index() # load at startup
"""Datacenter workload (baseline)"""
def _build_dc(scale: float = 1.0, duration_s: int = 300) -> OfflineDatacenter:
df = _load_traces_index()
first_row = df.iloc[0]
first_model = tuple(m for m in _MODELS if m.model_label == first_row["model_label"])
inference_data = InferenceData.load(_DATA_DIR, first_model)
training_trace = TrainingTrace.ensure(
_DATA_DIR / "training_trace.csv",
TrainingTraceParams(),
)
t0 = min(40.0, duration_s * 0.13)
t1 = min(140.0, duration_s * 0.47)
replica_schedules = {}
for m in _MODELS:
initial_replicas = max(1, int(scale * 8))
reduced_replicas = max(1, int(initial_replicas * 0.25))
replica_schedules[m.model_label] = (
ReplicaSchedule(initial=initial_replicas)
.ramp_to(
reduced_replicas,
t_start=min(150.0, duration_s * 0.50),
t_end=min(220.0, duration_s * 0.73),
)
)
workload = OfflineWorkload(
inference_data=inference_data,
replica_schedules=replica_schedules,
training=TrainingRun(
n_gpus=max(1, int(24 * scale)),
trace=training_trace,
target_peak_W_per_gpu=400.0,
).at(
t_start=t0,
t_end=t1,
),
)
return OfflineDatacenter(
_DC_CONFIG,
workload,
dt_s=Fraction(1, 10),
seed=0,
name="baseline",
total_gpu_capacity=1000,
power_augmentation=PowerAugmentationConfig(
amplitude_scale_range=(0.88, 1.12),
noise_fraction=0.04,
),
)
"""
Build datacenter workload from GPU trace.
Returns (datacenter, raw_power_W_list)
"""
def _build_dc_from_real_trace(
model_label: str,
num_gpus: int,
max_num_seqs: int,
num_replicas: int,
duration_s: int,
) -> tuple[OfflineDatacenter, list[float]]:
power_W = _get_trace_power(model_label, num_gpus, max_num_seqs, num_replicas)
target_steps = int(duration_s / 0.1)
if len(power_W) < target_steps:
repeats = math.ceil(target_steps / len(power_W))
power_W = (power_W * repeats)[:target_steps]
else:
power_W = power_W[:target_steps]
df = _load_traces_index()
row = df[df["model_label"] == model_label].iloc[0]
model_tuple = tuple(m for m in _MODELS if m.model_label == model_label)
inference_data = InferenceData.load(_DATA_DIR, model_tuple)
workload = OfflineWorkload(
inference_data=inference_data,
replica_schedules={
model_label: ReplicaSchedule(initial=num_replicas)
},
)
# ← compute actual GPU count and add headroom
actual_gpu_count = num_replicas * num_gpus
gpu_capacity = max(1000, actual_gpu_count * 2)
dc = OfflineDatacenter(
_DC_CONFIG, workload, dt_s=Fraction(1, 10), seed=0,
name=model_label.replace(".", "-"),
total_gpu_capacity=gpu_capacity, # ← was hardcoded 1000
power_augmentation=PowerAugmentationConfig(
amplitude_scale_range=(1.0, 1.0),
noise_fraction=0.0,
),
)
return dc, power_W
"""Create IEEE 13-bus grid with datacenter connection."""
def _build_grid(tap_pu: float, dc_bus: str) -> OpenDSSGrid:
return OpenDSSGrid(
dss_case_dir=str(DSS_DIR),
dss_master_file=DSS_MASTER,
dt_s=Fraction(1),
source_pu=tap_pu,
initial_tap_position=INITIAL_TAPS,
)
def _make_tap(v: float):
return TapPosition(a=v, b=v, c=v).at(t=0)
"""Run datacenter + grid simulation."""
def _run(dc, grid, tap_pu, dc_bus, duration_s):
grid.attach_dc(dc, bus=dc_bus, connection_type="wye", power_factor=_DC_CONFIG.power_factor)
coord = Coordinator(
datacenters=[dc],
grid=grid,
controllers=[TapScheduleController(
schedule=TAP_CHANGE_SCHEDULE, # ← real schedule
dt_s=Fraction(1)
)],
total_duration_s=duration_s,
)
return coord.run()
"""
Runs one full simulation job (datacenter + grid) in a worker process
and returns results for the API.
"""
def _run_full(req_dict: dict) -> dict:
dc_bus = BUS_INDEX_TO_NAME.get(req_dict["targetBus"], "671")
replicas = max(1, req_dict["numReplicas"])
dc, raw_power_W = _build_dc_from_real_trace(
model_label = req_dict["modelLabel"],
num_gpus = req_dict["numGpus"],
max_num_seqs = req_dict["maxNumSeqs"],
num_replicas = replicas,
duration_s = req_dict["durationS"],
)
grid = _build_grid(req_dict["substationVoltage"], dc_bus)
log = _run(dc, grid, req_dict["substationVoltage"], dc_bus, req_dict["durationS"])
step = max(1, req_dict["sampleInterval"])
gs_sampled = log.grid_states[::step]
t_sampled = list(log.time_s[::step])
dc_states = log.dc_states
results = []
for i, (t, gs) in enumerate(zip(t_sampled, gs_sampled)):
vs = _voltages(gs)
dc_i = min(range(len(dc_states)), key=lambda j: abs(dc_states[j].time_s - t))
ds = dc_states[dc_i]
kw = float((ds.power_w.a + ds.power_w.b + ds.power_w.c) / 1000)
if math.isnan(kw): kw = 0.0
trace_idx = min(int(t / 0.1), len(raw_power_W) - 1) if raw_power_W else 0
raw_kw = raw_power_W[trace_idx] / 1000.0 if raw_power_W else kw
results.append({
"time": float(t),
"gpu_power_W": kw * 1000,
"gpu_power_kW": kw,
"gpu_power_raw_kW": raw_kw,
"gpu_reactive_kVAR": kw * 0.329,
"active_gpus": replicas * req_dict["numGpus"],
"voltages": vs,
"min_voltage": min(vs),
"max_voltage": max(vs),
"target_bus_voltage": vs[req_dict["targetBus"] - 1],
"total_load_kW": kw,
})
return {
"numSamples": len(results),
"targetBus": req_dict["targetBus"],
"modelLabel": req_dict["modelLabel"],
"numGpus": req_dict["numGpus"],
"maxNumSeqs": req_dict["maxNumSeqs"],
"numReplicas": replicas,
"duration": float(max(r["time"] for r in results) if results else 0),
"minVoltage": float(min(r["min_voltage"] for r in results) if results else 1.0),
"maxVoltage": float(max(r["max_voltage"] for r in results) if results else 1.0),
"avgGpuPower": float(sum(r["gpu_power_W"] for r in results) / len(results) if results else 0),
"peakGpuPower": float(max(r["gpu_power_W"] for r in results) if results else 0),
"timeSeries": results,
}
"""Get per-bus voltage (worst phase per bus)."""
def _voltages(gs) -> list[float]:
result = []
none_count = 0
for name in BUSES_ORDERED:
try:
tp = gs.voltages[name]
vals = [float(v) for v in [tp.a, tp.b, tp.c]
if not math.isnan(float(v)) and 0.5 < float(v) < 1.5]
result.append(min(vals) if vals else None)
except Exception as e:
logger.debug(f"Bus {name} voltage unavailable: {e}")
result.append(None)
none_count = sum(1 for v in result if v is None)
if none_count > 3:
logger.warning(f"OpenDSS convergence failure: {none_count}/13 buses returned None")
known = [v for v in result if v is not None]
avg = sum(known) / len(known) if known else 1.0
return [v if v is not None else avg for v in result]
# ── FastAPI────────────────────────────────────────────────────────────────
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=False,
allow_methods=["*"],
allow_headers=["*"],
)
class PowerflowRequest(BaseModel):
substationVoltage: float = 1.05
numBuses: int = 13
baseVoltage: float = 4.16
targetBus: int = 0
class LLMImpactRequest(BaseModel):
targetBus: int = 9
sampleInterval: int = 1
substationVoltage: float = 1.05
modelLabel: str = "Llama-3.1-8B"
numGpus: int = 1
maxNumSeqs: int = 128
numReplicas: int = 1
durationS: int = 300
class HeatmapRequest(BaseModel):
voltages: list[float]
dataCenterBus: Optional[int] = None
@app.get("/api/health")
def health():
return {"status": "ok", "data_ready": _DATA_DIR.exists(),
"message": "gpu2grid OpenDSS server"}
"""Return available traces"""
@app.get("/api/traces")
def list_traces():
df = _load_traces_index()
if df.empty:
return {"traces": [], "models": [], "trainingAvailable": False}
traces = df[["model_label","num_gpus","max_num_seqs"]].to_dict("records")
models = []
for model_label, group in df.groupby("model_label"):
models.append({
"modelLabel": model_label,
"numGpus": int(group["num_gpus"].iloc[0]),
"batchSizes": sorted(group["max_num_seqs"].tolist()),
})
training_available = (_DATA_DIR / "training_trace.csv").exists()
return {
"traces": traces,
"models": models,
"trainingAvailable": training_available,
"dataDir": str(_DATA_DIR),
}
"""Baseline grid simulation, no workload"""
@app.post("/api/powerflow")
async def powerflow(req: PowerflowRequest):
logger.info(f"Powerflow request v={req.substationVoltage}")
try:
dc = _build_dc(scale=0.001, duration_s=5)
grid = _build_grid(req.substationVoltage, "671")
log = _run(dc, grid, req.substationVoltage, "671", 5)
vs = _voltages(log.grid_states[-1])
logger.info(f"Powerflow result min={min(vs):.4f} max={max(vs):.4f}")
return {"buses": [{"id": i+1, "voltage": v, "activePower": 0.0,
"reactivePower": 0.0} for i, v in enumerate(vs)],
"lines": []}
except Exception as e:
logger.exception("Powerflow failed")
raise HTTPException(status_code=500, detail=str(e))
"""Simulate AI workload impact on grid using GPU traces."""
@app.post("/api/llm-impact")
async def llm_impact(req: LLMImpactRequest):
logger.info(f"Simulation request: {req.modelLabel} bus={req.targetBus}")
try:
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(_pool, _run_full, req.dict())
return result
except Exception as e:
logger.exception("Simulation failed")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/heatmap")
async def heatmap(req: HeatmapRequest):
if len(req.voltages) != 13:
raise HTTPException(400, f"Need 13 voltages, got {len(req.voltages)}")
script = str(Path(__file__).parent / "generate_heatmap.py")
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as f:
out = f.name
subprocess.run(
["python3", script, out] + [str(v) for v in req.voltages] +
([str(req.dataCenterBus)] if req.dataCenterBus else []),
check=True,
)
png = open(out, "rb").read()
os.unlink(out)
return Response(content=png, media_type="image/png")
def _serialize_tick(tick, req_dict: dict, raw_power_W: list[float]) -> dict:
"""Serialize one TickOutput to a small JSON-safe dict for the frontend."""
t = tick.t_s
# Grid voltages (may be None if grid didn't tick this step)
voltages = None
min_v = max_v = target_v = None
if tick.grid_state is not None:
voltages = _voltages(tick.grid_state)
min_v = min(voltages)
max_v = max(voltages)
target_v = voltages[req_dict["targetBus"] - 1]
# DC power (sum all sites)
kw = 0.0
batch_by_model: dict[str, int] = {}
for dc_name, ds in tick.dc_states.items():
pw = ds.power_w
kw += float((pw.a + pw.b + pw.c) / 1000)
if hasattr(ds, "batch_size_by_model"):
batch_by_model.update(ds.batch_size_by_model)
if math.isnan(kw):
kw = 0.0
trace_idx = min(int(t / 0.1), len(raw_power_W) - 1) if raw_power_W else 0
raw_kw = raw_power_W[trace_idx] / 1000.0 if raw_power_W else kw
events = [{"type": e.event_type, "data": e.data} for e in tick.sim_events]
return {
"time": float(t),
"gpu_power_kW": kw,
"gpu_power_raw_kW": raw_kw,
"active_gpus": req_dict["numReplicas"] * req_dict["numGpus"],
"batch_by_model": batch_by_model,
"voltages": voltages,
"min_voltage": min_v,
"max_voltage": max_v,
"target_bus_voltage": target_v,
"sim_events": events,
}
def _run_streaming(req_dict: dict):
"""Generator: builds coordinator and yields serialized tick dicts."""
dc_bus = BUS_INDEX_TO_NAME.get(req_dict["targetBus"], "671")
replicas = max(1, req_dict["numReplicas"])
dc, raw_power_W = _build_dc_from_real_trace(
model_label = req_dict["modelLabel"],
num_gpus = req_dict["numGpus"],
max_num_seqs = req_dict["maxNumSeqs"],
num_replicas = replicas,
duration_s = req_dict["durationS"],
)
grid = _build_grid(req_dict["substationVoltage"], dc_bus)
grid.attach_dc(dc, bus=dc_bus, connection_type="wye",
power_factor=_DC_CONFIG.power_factor)
coord = Coordinator(
datacenters=[dc],
grid=grid,
controllers=[TapScheduleController(
schedule=TAP_CHANGE_SCHEDULE,
dt_s=Fraction(1),
)],
total_duration_s=req_dict["durationS"],
)
step = max(1, req_dict["sampleInterval"])
tick_num = 0
try:
for tick in coord.run_iter():
if tick_num % step == 0:
yield _serialize_tick(tick, req_dict, raw_power_W)
tick_num += 1
finally:
coord.stop()
@app.websocket("/ws/sim-stream")
async def sim_stream(ws: WebSocket):
await ws.accept()
try:
req_dict = await ws.receive_json()
req = LLMImpactRequest(**req_dict)
logger.info(f"WS stream: {req.modelLabel} bus={req.targetBus}")
# Run full simulation in process pool (separate process = safe for OpenDSS)
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(_pool, _run_full, req.dict())
# Stream results tick by tick from the completed result
for row in result["timeSeries"]:
await ws.send_json(row)
await ws.send_json({"done": True})
except WebSocketDisconnect:
logger.info("WS client disconnected")
except Exception as e:
logger.exception("WS stream failed")
try:
await ws.send_json({"error": str(e)})
except Exception:
pass
if __name__ == "__main__":
logger.info("=" * 70)
logger.info(f"Data dir: {_DATA_DIR} ready={_DATA_DIR.exists()}")
df = _load_traces_index()
if not df.empty:
models = df["model_label"].unique().tolist()
logger.info(f"Models: {models}")
logger.info(f"Traces: {len(df)} configurations")
logger.info("=" * 70)
uvicorn.run("server:app", host="0.0.0.0", port=8080, workers=1, log_level="info", ws_ping_interval=None)