echo_env / client.py
sergiopaniego's picture
Upload folder using huggingface_hub
f58d312 verified
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
"""
Echo Environment Client.
This module provides the client for connecting to an Echo Environment server
via WebSocket for persistent sessions.
"""
from typing import Any, Dict
# Support both in-repo and standalone imports
try:
# In-repo imports (when running from OpenEnv repository)
from openenv.core.client_types import StepResult
from openenv.core.env_server.types import State
from openenv.core.env_client import EnvClient
from .models import EchoAction, EchoObservation
except ImportError:
# Standalone imports (when environment is standalone with openenv from pip)
from openenv.core.client_types import StepResult
from openenv.core.env_server.types import State
from openenv.core.env_client import EnvClient
from models import EchoAction, EchoObservation
class EchoEnv(EnvClient[EchoAction, EchoObservation, State]):
"""
Client for the Echo Environment.
This client maintains a persistent WebSocket connection to the environment
server, enabling efficient multi-step interactions with lower latency.
Each client instance has its own dedicated environment session on the server.
Example:
>>> # Connect to a running server
>>> with EchoEnv(base_url="http://localhost:8000") as client:
... result = client.reset()
... print(result.observation.echoed_message)
...
... result = client.step(EchoAction(message="Hello!"))
... print(result.observation.echoed_message)
... print(result.reward)
Example with Docker:
>>> # Automatically start container and connect
>>> client = EchoEnv.from_docker_image("echo-env:latest")
>>> try:
... result = client.reset()
... result = client.step(EchoAction(message="Test"))
... finally:
... client.close()
"""
def _step_payload(self, action: EchoAction) -> Dict:
"""
Convert EchoAction to JSON payload for step request.
Args:
action: EchoAction instance
Returns:
Dictionary representation suitable for JSON encoding
"""
return {
"message": action.message,
}
def _parse_result(self, payload: Dict) -> StepResult[EchoObservation]:
"""
Parse server response into StepResult[EchoObservation].
Args:
payload: JSON response from server
Returns:
StepResult with EchoObservation
"""
obs_data = payload.get("observation", {})
observation = EchoObservation(
echoed_message=obs_data.get("echoed_message", ""),
message_length=obs_data.get("message_length", 0),
done=payload.get("done", False),
reward=payload.get("reward"),
metadata=obs_data.get("metadata", {}),
)
return StepResult(
observation=observation,
reward=payload.get("reward"),
done=payload.get("done", False),
)
def _parse_state(self, payload: Dict) -> State:
"""
Parse server response into State object.
Args:
payload: JSON response from /state endpoint
Returns:
State object with episode_id and step_count
"""
return State(
episode_id=payload.get("episode_id"),
step_count=payload.get("step_count", 0),
)