|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
""" |
|
|
Echo Environment Client. |
|
|
|
|
|
This module provides the client for connecting to an Echo Environment server |
|
|
via WebSocket for persistent sessions. |
|
|
""" |
|
|
|
|
|
from typing import Any, Dict |
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
from openenv.core.client_types import StepResult |
|
|
from openenv.core.env_server.types import State |
|
|
from openenv.core.env_client import EnvClient |
|
|
from .models import EchoAction, EchoObservation |
|
|
except ImportError: |
|
|
|
|
|
from openenv.core.client_types import StepResult |
|
|
from openenv.core.env_server.types import State |
|
|
from openenv.core.env_client import EnvClient |
|
|
from models import EchoAction, EchoObservation |
|
|
|
|
|
|
|
|
class EchoEnv(EnvClient[EchoAction, EchoObservation, State]): |
|
|
""" |
|
|
Client for the Echo Environment. |
|
|
|
|
|
This client maintains a persistent WebSocket connection to the environment |
|
|
server, enabling efficient multi-step interactions with lower latency. |
|
|
Each client instance has its own dedicated environment session on the server. |
|
|
|
|
|
Example: |
|
|
>>> # Connect to a running server |
|
|
>>> with EchoEnv(base_url="http://localhost:8000") as client: |
|
|
... result = client.reset() |
|
|
... print(result.observation.echoed_message) |
|
|
... |
|
|
... result = client.step(EchoAction(message="Hello!")) |
|
|
... print(result.observation.echoed_message) |
|
|
... print(result.reward) |
|
|
|
|
|
Example with Docker: |
|
|
>>> # Automatically start container and connect |
|
|
>>> client = EchoEnv.from_docker_image("echo-env:latest") |
|
|
>>> try: |
|
|
... result = client.reset() |
|
|
... result = client.step(EchoAction(message="Test")) |
|
|
... finally: |
|
|
... client.close() |
|
|
""" |
|
|
|
|
|
def _step_payload(self, action: EchoAction) -> Dict: |
|
|
""" |
|
|
Convert EchoAction to JSON payload for step request. |
|
|
|
|
|
Args: |
|
|
action: EchoAction instance |
|
|
|
|
|
Returns: |
|
|
Dictionary representation suitable for JSON encoding |
|
|
""" |
|
|
return { |
|
|
"message": action.message, |
|
|
} |
|
|
|
|
|
def _parse_result(self, payload: Dict) -> StepResult[EchoObservation]: |
|
|
""" |
|
|
Parse server response into StepResult[EchoObservation]. |
|
|
|
|
|
Args: |
|
|
payload: JSON response from server |
|
|
|
|
|
Returns: |
|
|
StepResult with EchoObservation |
|
|
""" |
|
|
obs_data = payload.get("observation", {}) |
|
|
observation = EchoObservation( |
|
|
echoed_message=obs_data.get("echoed_message", ""), |
|
|
message_length=obs_data.get("message_length", 0), |
|
|
done=payload.get("done", False), |
|
|
reward=payload.get("reward"), |
|
|
metadata=obs_data.get("metadata", {}), |
|
|
) |
|
|
|
|
|
return StepResult( |
|
|
observation=observation, |
|
|
reward=payload.get("reward"), |
|
|
done=payload.get("done", False), |
|
|
) |
|
|
|
|
|
def _parse_state(self, payload: Dict) -> State: |
|
|
""" |
|
|
Parse server response into State object. |
|
|
|
|
|
Args: |
|
|
payload: JSON response from /state endpoint |
|
|
|
|
|
Returns: |
|
|
State object with episode_id and step_count |
|
|
""" |
|
|
return State( |
|
|
episode_id=payload.get("episode_id"), |
|
|
step_count=payload.get("step_count", 0), |
|
|
) |
|
|
|