Skip to content

Commit

Permalink
Addressed review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Srinath Krishnamachari <[email protected]>
  • Loading branch information
srinathk10 committed Oct 11, 2024
1 parent 766db7d commit 6c0de82
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 162 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -211,16 +211,13 @@ def _dispatch_tasks(self):
).remote(DataContext.get_current(), ctx, *input_blocks)

def _task_done_callback(actor_to_return):
if actor_to_return in self._actor_pool._num_tasks_in_flight:
# Return the actor that was running the task to the pool.
self._actor_pool.return_actor(actor_to_return)
else:
assert (
actor_to_return.get_location
in self._actor_pool._restarting_actors
)
# Move the actor from restarting to running state.
self._actor_pool.restarting_to_running(actor_to_return)
# If actor is found in restarting, move it to running.
self._actor_pool.restarting_to_running(
actor, actor.get_location.remote()
)

# Return the actor that was running the task to the pool.
self._actor_pool.return_actor(actor_to_return)

# Dipsatch more tasks.
self._dispatch_tasks()
Expand Down Expand Up @@ -363,19 +360,27 @@ def _apply_default_remote_args(ray_remote_args: Dict[str, Any]) -> Dict[str, Any
def get_autoscaling_actor_pools(self) -> List[AutoscalingActorPool]:
return [self._actor_pool]

def _manage_actor_restarting_state(self, actor):
actor_state = actor._get_local_state()
if actor_state != gcs_pb2.ActorTableData.ActorState.ALIVE:
# If an actor is not ALIVE, it's a candidate to be marked as a
# restarting actor.
assert actor_state is gcs_pb2.ActorTableData.ActorState.RESTARTING
if actor in self._actor_pool._num_tasks_in_flight:
# Change Actor state from running to restarting
self._actor_pool.running_to_restarting(
actor, actor.get_location.remote()
)
else:
# If an actor is ALIVE, it's a candidate to be marked as a
# running actor, if not already the case.
self._actor_pool.restarting_to_running(actor, actor.get_location.remote())

def update_resource_usage(self) -> None:
"""Updates resources usage."""
actors = list(self._actor_pool._num_tasks_in_flight.keys())
for actor in actors:
actor_state = actor._get_local_state()
if actor_state != gcs_pb2.ActorTableData.ActorState.ALIVE:
# If an actor is not ALIVE, it's a candidate to be marked as a
# restarting actor.
self._actor_pool.running_to_restarting(actor, actor.get_location)
else:
# If an actor is ALIVE, it's a candidate to be marked as a
# running actor, if not already the case.
self._actor_pool.restarting_to_running(actor.get_location)
self._manage_actor_restarting_state(actor)


class _MapWorker:
Expand Down Expand Up @@ -440,12 +445,14 @@ def __init__(

# Number of tasks in flight per actor.
self._num_tasks_in_flight: Dict[ray.actor.ActorHandle, int] = {}
# Number of tasks in flight per restarting actor.
self._num_tasks_restarting_actors: Dict[ray.actor.ActorHandle, int] = {}
# Node id of each ready actor.
self._actor_locations: Dict[ray.actor.ActorHandle, str] = {}
# Node id of each restarting actor.
self._restarting_actor_locations: Dict[ray.actor.ActorHandle, str] = {}
# Actors that are not yet ready (still pending creation).
self._pending_actors: Dict[ObjectRef, ray.actor.ActorHandle] = {}
# Actors that are restarting.
self._restarting_actors: Dict[ObjectRef, ray.actor.ActorHandle] = {}
# Whether actors that become idle should be eagerly killed. This is False until
# the first call to kill_idle_actors().
self._should_kill_idle_actors = False
Expand Down Expand Up @@ -477,7 +484,7 @@ def num_pending_actors(self) -> int:
return len(self._pending_actors)

def num_restarting_actors(self) -> int:
return len(self._restarting_actors)
return len(self._num_tasks_restarting_actors)

def max_tasks_in_flight_per_actor(self) -> int:
return self._max_tasks_in_flight
Expand Down Expand Up @@ -551,26 +558,32 @@ def running_to_restarting(
if actor not in self._num_tasks_in_flight:
# The actor has been removed from the pool before becoming restarting.
return False
self._num_tasks_restarting_actors[actor] = self._num_tasks_in_flight[actor]
self._num_tasks_in_flight[actor] = 0
self._restarting_actor_locations[actor] = ray.get(ready_ref)
self._remove_actor(actor)
self._restarting_actors[ready_ref] = actor
return True

def restarting_to_running(self, ready_ref: ray.ObjectRef) -> bool:
"""Mark the actor corresponding to the provided ready future as running, making
the actor pickable.
def restarting_to_running(
self, actor: ray.actor.ActorHandle, ready_ref: ray.ObjectRef
) -> bool:
"""Mark the actor as running, making the actor pickable.
Args:
ready_ref: The ready future for the actor that we wish to mark as running.
actor: Then restarting actor to add as running to the pool.
ready_ref: The ready future for the actor that we wish to mark as
running.
Returns:
Whether the actor was still restarting. This can return False if the actor
had already been killed.
"""
if ready_ref not in self._restarting_actors:
if actor not in self._restarting_actor_locations:
# The actor has been removed from the pool before becoming running.
return False
actor = self._restarting_actors.pop(ready_ref)
self._num_tasks_in_flight[actor] = 0
self._restarting_actor_locations.pop(actor)
self._num_tasks_in_flight[actor] = self._num_tasks_restarting_actors[actor]
self._num_tasks_restarting_actors[actor] = 0
self._actor_locations[actor] = ray.get(ready_ref)
return True

Expand Down Expand Up @@ -630,9 +643,6 @@ def return_actor(self, actor: ray.actor.ActorHandle):
def get_pending_actor_refs(self) -> List[ray.ObjectRef]:
return list(self._pending_actors.keys())

def get_restarting_actor_refs(self) -> List[ray.ObjectRef]:
return list(self._restarting_actors.keys())

def num_idle_actors(self) -> int:
"""Return the number of idle actors in the pool."""
return sum(
Expand Down Expand Up @@ -679,13 +689,14 @@ def _maybe_kill_pending_actor(self) -> bool:
return False

def _maybe_kill_restarting_actor(self) -> bool:
if self._restarting_actors:
# At least one restarting actor, so kill first one.
ready_ref = next(iter(self._restarting_actors.keys()))
self._remove_actor(self._restarting_actors[ready_ref])
del self._restarting_actors[ready_ref]
return True
# No restarting actors, so indicate to the caller that no actors were killed.
for actor in self._restarting_actor_locations.keys():
if self._num_tasks_restarting_actors[actor] == 0:
# At least one restarting actor, so kill first one.
self._remove_actor(actor)
del self._restarting_actor_locations[actor]
return True
# No candidate restarting actors, so indicate to the caller that no actors were
# killed.
return False

def _maybe_kill_idle_actor(self) -> bool:
Expand Down Expand Up @@ -724,9 +735,9 @@ def _kill_all_pending_actors(self):
self._pending_actors.clear()

def _kill_all_restarting_actors(self):
for _, actor in self._restarting_actors.items():
for actor in self._restarting_actor_locations.keys():
self._remove_actor(actor)
self._restarting_actors.clear()
self._restarting_actor_locations.clear()

def _kill_all_idle_actors(self):
idle_actors = [
Expand Down
120 changes: 0 additions & 120 deletions python/ray/data/tests/test_actor_pool_fault_tolerance.py

This file was deleted.

Loading

0 comments on commit 6c0de82

Please sign in to comment.