Skip to content

Commit

Permalink
Addressed review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Srinath Krishnamachari <[email protected]>
  • Loading branch information
srinathk10 committed Oct 18, 2024
1 parent 34a4dcd commit 59602a9
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -390,27 +390,6 @@ def num_active_tasks(self) -> int:
"""
return len(self.get_active_tasks())

def num_alive_actors(self) -> int:
"""Return the number of alive actors.
This method is used to display alive actor info in the progress bar.
"""
return 0

def num_pending_actors(self) -> int:
"""Return the number of pending actors.
This method is used to display pending actor info in the progress bar.
"""
return 0

def num_restarting_actors(self) -> int:
"""Return the number of restarting actors.
This method is used to display restarting actor info in the progress bar.
"""
return 0

def throttling_disabled(self) -> bool:
"""Whether to disable resource throttling for this operator.
Expand Down Expand Up @@ -530,3 +509,12 @@ def update_resource_usage(self) -> None:
restarting actors, retrying tasks, lost objects, etc.
"""
pass

def actor_info_progress_str(self) -> str:
"""Returns Actor progress strings for Alive, Restarting and Pending Actors.
This method will be called in summary_str API in OpState. Subcallses can
override it to return Actor progress strings for Alive, Restarting and Pending
Actors.
"""
return ""
Original file line number Diff line number Diff line change
Expand Up @@ -305,27 +305,6 @@ def pending_processor_usage(self) -> ExecutionResources:
gpu=self._ray_remote_args.get("num_gpus", 0) * num_pending_workers,
)

def num_alive_actors(self) -> int:
"""Return the number of alive actors.
This method is used to display alive actor info in the progress bar.
"""
return self._actor_pool.num_alive_actors()

def num_pending_actors(self) -> int:
"""Return the number of pending actors.
This method is used to display pending actor info in the progress bar.
"""
return self._actor_pool.num_pending_actors()

def num_restarting_actors(self) -> int:
"""Return the number of restarting actors.
This method is used to display restarting actor info in the progress bar.
"""
return self._actor_pool.num_restarting_actors()

def incremental_resource_usage(self) -> ExecutionResources:
# Submitting tasks to existing actors doesn't require additional
# CPU/GPU resources.
Expand Down Expand Up @@ -382,6 +361,10 @@ def update_resource_usage(self) -> None:
else:
self._actor_pool.update_running_actor_state(actor, False)

def actor_info_progress_str(self) -> str:
"""Returns Actor progress strings for Alive, Restarting and Pending Actors."""
return self._actor_pool.actor_info_progress_str()


class _MapWorker:
"""An actor worker for MapOperator."""
Expand Down Expand Up @@ -756,3 +739,17 @@ def _get_location(self, bundle: RefBundle) -> Optional[NodeIdStr]:
A node id associated with the bundle, or None if unknown.
"""
return bundle.get_cached_location()

def actor_info_progress_str(self) -> str:
"""Returns Actor progress strings for Alive, Restarting and Pending Actors."""
alive = self.num_alive_actors()
pending = self.num_pending_actors()
restarting = self.num_restarting_actors()
total = alive + pending + restarting
if total == alive:
return f"; Actors: {total}"
else:
return (
f"; Actors: {total} (alive {alive}, restarting {restarting}, "
f"pending {pending})"
)
1 change: 1 addition & 0 deletions python/ray/data/_internal/execution/resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ def update_usages(self):
for op, state in reversed(self._topology.items()):
# Update `self._op_usages`, `self._op_running_usages`,
# and `self._op_pending_usages`.
op.update_resource_usage()
op_usage = op.current_processor_usage()
op_running_usage = op.running_processor_usage()
op_pending_usage = op.pending_processor_usage()
Expand Down
1 change: 0 additions & 1 deletion python/ray/data/_internal/execution/streaming_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,6 @@ def _scheduling_loop_step(self, topology: Topology) -> bool:
i += 1
if i % PROGRESS_BAR_UPDATE_INTERVAL == 0:
self._refresh_progress_bars(topology)
op.update_resource_usage()
topology[op].dispatch_next_task()
self._resource_manager.update_usages()
op = select_operator_to_run(
Expand Down
16 changes: 1 addition & 15 deletions python/ray/data/_internal/execution/streaming_executor_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,20 +256,6 @@ def refresh_progress_bar(self, resource_manager: ResourceManager) -> None:
self.progress_bar.set_description(self.summary_str(resource_manager))
self.progress_bar.refresh()

def actor_info_progress_str(self) -> str:
# Alive/Pending/Restarting actors
alive = self.op.num_alive_actors()
pending = self.op.num_pending_actors()
restarting = self.op.num_restarting_actors()
total = alive + pending + restarting
if total == alive:
return f"; Actors: {total}"
else:
return (
f"; Actors: {total} (alive {alive}, restarting {restarting}, "
f"pending {pending})"
)

def summary_str(self, resource_manager: ResourceManager) -> str:
# Active tasks
active = self.op.num_active_tasks()
Expand All @@ -281,7 +267,7 @@ def summary_str(self, resource_manager: ResourceManager) -> str:
desc += " [backpressured]"

# Actors info
desc += self.actor_info_progress_str()
desc += self.op.actor_info_progress_str()

# Queued blocks
queued = self.num_queued() + self.op.internal_queue_size()
Expand Down
6 changes: 6 additions & 0 deletions python/ray/data/tests/test_actor_pool_map_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ def test_restarting_to_alive(self):
assert pool.num_active_actors() == 0
assert pool.num_idle_actors() == 1
assert pool.num_free_slots() == 1
assert (
pool.actor_info_progress_str()
== "; Actors: 1 (alive 0, restarting 1, pending 0)"
)

# Mark the actor as alive and test pick_actor succeeds
pool.update_running_actor_state(actor, False)
Expand All @@ -157,6 +161,7 @@ def test_restarting_to_alive(self):
assert pool.num_active_actors() == 1
assert pool.num_idle_actors() == 0
assert pool.num_free_slots() == 0
assert pool.actor_info_progress_str() == "; Actors: 1"

# Return the actor
pool.return_actor(picked_actor)
Expand All @@ -168,6 +173,7 @@ def test_restarting_to_alive(self):
assert pool.num_active_actors() == 0
assert pool.num_idle_actors() == 1
assert pool.num_free_slots() == 1
assert pool.actor_info_progress_str() == "; Actors: 1"

def test_repeated_picking(self):
# Test that we can repeatedly pick the same actor.
Expand Down

0 comments on commit 59602a9

Please sign in to comment.