Skip to content

Adding default agentmode for ithor objectnav task #307

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 19 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 164 additions & 1 deletion allenact_plugins/ithor_plugin/ithor_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@
from allenact.utils.system import get_logger
from allenact_plugins.ithor_plugin.ithor_constants import VISIBILITY_DISTANCE, FOV
from allenact_plugins.ithor_plugin.ithor_util import round_to_factor
from ai2thor.util import metrics
from allenact.utils.cache_utils import (
DynamicDistanceCache,
pos_to_str_for_cache,
str_to_pos_for_cache,
)


class IThorEnvironment(object):
Expand All @@ -31,18 +37,24 @@ class IThorEnvironment(object):

def __init__(
self,
all_metadata_available: bool = True,
x_display: Optional[str] = None,
docker_enabled: bool = False,
local_thor_build: Optional[str] = None,
visibility_distance: float = VISIBILITY_DISTANCE,
fov: float = FOV,
player_screen_width: int = 300,
player_screen_height: int = 300,
grid_size: float = 0.25,
rotate_step_degrees: int = 90,
quality: str = "Very Low",
restrict_to_initially_reachable_points: bool = False,
make_agents_visible: bool = True,
object_open_speed: float = 1.0,
simplify_physics: bool = False,
snap_to_grid: bool = True,
agent_count: int = 1,
**kwargs,
) -> None:
"""Initializer.

Expand Down Expand Up @@ -81,11 +93,14 @@ def __init__(
self.controller: Optional[Controller] = None
self._started = False
self._quality = quality
self._snap_to_grid = snap_to_grid
self.agent_count = agent_count

self._initially_reachable_points: Optional[List[Dict]] = None
self._initially_reachable_points_set: Optional[Set[Tuple[float, float]]] = None
self._move_mag: Optional[float] = None
self._grid_size: Optional[float] = None
self._grid_size: Optional[float] = grid_size
self._rotate_step_degrees = rotate_step_degrees
self._visibility_distance = visibility_distance
self._fov = fov
self.restrict_to_initially_reachable_points = (
Expand All @@ -95,10 +110,113 @@ def __init__(
self.object_open_speed = object_open_speed
self._always_return_visible_range = False
self.simplify_physics = simplify_physics
self.all_metadata_available = all_metadata_available

self.scene_to_reachable_positions: Optional[Dict[str, Any]] = None
self.distance_cache: Optional[DynamicDistanceCache] = None

self.start(None)
# noinspection PyTypeHints
if self.all_metadata_available:
self.scene_to_reachable_positions = {
self.scene_name: copy.deepcopy(self.currently_reachable_points)
}
assert len(self.scene_to_reachable_positions[self.scene_name]) > 10

self.distance_cache = DynamicDistanceCache(rounding=1)
self.controller.docker_enabled = docker_enabled # type: ignore
self._extra_teleport_kwargs: Dict[
str, Any
] = {} # Used for backwards compatability with the teleport action

def path_from_point_to_object_type(
self, point: Dict[str, float], object_type: str, allowed_error: float
) -> Optional[List[Dict[str, float]]]:
event = self.controller.step(
action="GetShortestPath",
objectType=object_type,
position=point,
allowedError=allowed_error,
)
if event.metadata["lastActionSuccess"]:
return event.metadata["actionReturn"]["corners"]
else:
get_logger().debug(
"Failed to find path for {} in {}. Start point {}, agent state {}.".format(
object_type,
self.controller.last_event.metadata["sceneName"],
point,
self.agent_state(),
)
)
return None

def distance_from_point_to_object_type(
self, point: Dict[str, float], object_type: str, allowed_error: float
) -> float:
"""Minimal geodesic distance from a point to an object of the given
type.
It might return -1.0 for unreachable targets.
"""
path = self.path_from_point_to_object_type(point, object_type, allowed_error)
if path:
# Because `allowed_error != 0` means that the path returned above might not start
# at `point`, we explicitly add any offset there is.
s_dist = math.sqrt(
(point["x"] - path[0]["x"]) ** 2 + (point["z"] - path[0]["z"]) ** 2
)
return metrics.path_distance(path) + s_dist
return -1.0

def distance_to_object_type(self, object_type: str, agent_id: int = 0) -> float:
"""Minimal geodesic distance to object of given type from agent's
current location.
It might return -1.0 for unreachable targets.
"""
assert 0 <= agent_id < self.agent_count
assert (
self.all_metadata_available
), "`distance_to_object_type` cannot be called when `self.all_metadata_available` is `False`."

def retry_dist(position: Dict[str, float], object_type: str):
allowed_error = 0.05
debug_log = ""
d = -1.0
while allowed_error < 2.5:
d = self.distance_from_point_to_object_type(
position, object_type, allowed_error
)
if d < 0:
debug_log = (
f"In scene {self.scene_name}, could not find a path from {position} to {object_type} with"
f" {allowed_error} error tolerance. Increasing this tolerance to"
f" {2 * allowed_error} any trying again."
)
allowed_error *= 2
else:
break
if d < 0:
get_logger().warning(
f"In scene {self.scene_name}, could not find a path from {position} to {object_type}"
f" with {allowed_error} error tolerance. Returning a distance of -1."
)
elif debug_log != "":
get_logger().debug(debug_log)
return d

return self.distance_cache.find_distance(
self.scene_name,
self.controller.last_event.events[agent_id].metadata["agent"]["position"],
object_type,
retry_dist,
)

@property
def currently_reachable_points(self) -> List[Dict[str, float]]:
"""List of {"x": x, "y": y, "z": z} locations in the scene that are
currently reachable."""
self.step({"action": "GetReachablePositions"})
return self.last_event.metadata["actionReturn"] # type:ignore

@property
def scene_name(self) -> str:
Expand Down Expand Up @@ -189,8 +307,12 @@ def start(
width=self._start_player_screen_width,
height=self._start_player_screen_height,
local_executable_path=self._local_thor_build,
snapToGrid=self._snap_to_grid,
quality=self._quality,
server_class=ai2thor.fifo_server.FifoServer,
gridSize=self._grid_size,
rotateStepDegrees=self._rotate_step_degrees,
visibilityDistance=self._visibility_distance,
)

if (
Expand Down Expand Up @@ -529,6 +651,17 @@ def currently_reachable_points(self) -> List[Dict[str, float]]:
self.step({"action": "GetReachablePositions"})
return self.last_event.metadata["actionReturn"] # type:ignore

def agent_state(self, agent_id: int = 0) -> Dict:
"""Return agent position, rotation and horizon."""
assert 0 <= agent_id < self.agent_count

agent_meta = self.last_event.events[agent_id].metadata["agent"]
return {
**{k: float(v) for k, v in agent_meta["position"].items()},
"rotation": {k: float(v) for k, v in agent_meta["rotation"].items()},
"horizon": round(float(agent_meta["cameraHorizon"]), 1),
}

def get_agent_location(self) -> Dict[str, Union[float, bool]]:
"""Gets agent's location."""
metadata = self.controller.last_event.metadata
Expand Down Expand Up @@ -728,6 +861,36 @@ def step(

return sr

def set_object_filter(self, object_ids: List[str]):
self.controller.step("SetObjectFilter", objectIds=object_ids, renderImage=False)

def reset_object_filter(self):
self.controller.step("ResetObjectFilter", renderImage=False)

def teleport(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use the existing teleport_agent_to function?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried it at first but it was giving me a lot of warnings of this type
"Teleportation FAILED but agent still moved (position_dist {}, rot diff {})" So, I was not sure and copied the teleport function from Robothor environment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha, it would be nice if you could track down what the issue is, otherwise I would need to be convinced that it's worth having two functions that do roughly the same thing.

self,
position: Dict[str, float],
rotation: Dict[str, float],
horizon: float = 0.0,
):
try:
e = self.controller.step(
action="TeleportFull",
x=position["x"],
y=position["y"],
z=position["z"],
rotation=rotation,
horizon=horizon,
**self._extra_teleport_kwargs,
)
except ValueError as e:
if len(self._extra_teleport_kwargs) == 0:
self._extra_teleport_kwargs["standing"] = True
else:
raise e
return self.teleport(position=position, rotation=rotation, horizon=horizon)
return e.metadata["lastActionSuccess"]

@staticmethod
def position_dist(
p0: Mapping[str, Any],
Expand Down
Loading