diff --git a/python/ray/data/_internal/execution/interfaces/physical_operator.py b/python/ray/data/_internal/execution/interfaces/physical_operator.py index 60f7514f6c2bb..361b18babf6c7 100644 --- a/python/ray/data/_internal/execution/interfaces/physical_operator.py +++ b/python/ray/data/_internal/execution/interfaces/physical_operator.py @@ -399,27 +399,6 @@ def num_active_tasks(self) -> int: """ return len(self.get_active_tasks()) - def num_alive_actors(self) -> int: - """Return the number of alive actors. - - This method is used to display alive actor info in the progress bar. - """ - return 0 - - def num_pending_actors(self) -> int: - """Return the number of pending actors. - - This method is used to display pending actor info in the progress bar. - """ - return 0 - - def num_restarting_actors(self) -> int: - """Return the number of restarting actors. - - This method is used to display restarting actor info in the progress bar. - """ - return 0 - def throttling_disabled(self) -> bool: """Whether to disable resource throttling for this operator. @@ -539,3 +518,12 @@ def update_resource_usage(self) -> None: restarting actors, retrying tasks, lost objects, etc. """ pass + + def actor_info_progress_str(self) -> str: + """Returns Actor progress strings for Alive, Restarting and Pending Actors. + + This method will be called in summary_str API in OpState. Subcallses can + override it to return Actor progress strings for Alive, Restarting and Pending + Actors. + """ + return "" diff --git a/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py b/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py index f550ff039b841..987fc1c118957 100644 --- a/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py +++ b/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py @@ -305,27 +305,6 @@ def pending_processor_usage(self) -> ExecutionResources: gpu=self._ray_remote_args.get("num_gpus", 0) * num_pending_workers, ) - def num_alive_actors(self) -> int: - """Return the number of alive actors. - - This method is used to display alive actor info in the progress bar. - """ - return self._actor_pool.num_alive_actors() - - def num_pending_actors(self) -> int: - """Return the number of pending actors. - - This method is used to display pending actor info in the progress bar. - """ - return self._actor_pool.num_pending_actors() - - def num_restarting_actors(self) -> int: - """Return the number of restarting actors. - - This method is used to display restarting actor info in the progress bar. - """ - return self._actor_pool.num_restarting_actors() - def incremental_resource_usage(self) -> ExecutionResources: # Submitting tasks to existing actors doesn't require additional # CPU/GPU resources. @@ -382,6 +361,10 @@ def update_resource_usage(self) -> None: else: self._actor_pool.update_running_actor_state(actor, False) + def actor_info_progress_str(self) -> str: + """Returns Actor progress strings for Alive, Restarting and Pending Actors.""" + return self._actor_pool.actor_info_progress_str() + class _MapWorker: """An actor worker for MapOperator.""" @@ -756,3 +739,17 @@ def _get_location(self, bundle: RefBundle) -> Optional[NodeIdStr]: A node id associated with the bundle, or None if unknown. """ return bundle.get_cached_location() + + def actor_info_progress_str(self) -> str: + """Returns Actor progress strings for Alive, Restarting and Pending Actors.""" + alive = self.num_alive_actors() + pending = self.num_pending_actors() + restarting = self.num_restarting_actors() + total = alive + pending + restarting + if total == alive: + return f"; Actors: {total}" + else: + return ( + f"; Actors: {total} (alive {alive}, restarting {restarting}, " + f"pending {pending})" + ) diff --git a/python/ray/data/_internal/execution/resource_manager.py b/python/ray/data/_internal/execution/resource_manager.py index 60cf3c4e71843..32b23443c7ed4 100644 --- a/python/ray/data/_internal/execution/resource_manager.py +++ b/python/ray/data/_internal/execution/resource_manager.py @@ -127,6 +127,7 @@ def update_usages(self): for op, state in reversed(self._topology.items()): # Update `self._op_usages`, `self._op_running_usages`, # and `self._op_pending_usages`. + op.update_resource_usage() op_usage = op.current_processor_usage() op_running_usage = op.running_processor_usage() op_pending_usage = op.pending_processor_usage() diff --git a/python/ray/data/_internal/execution/streaming_executor.py b/python/ray/data/_internal/execution/streaming_executor.py index aa78e0903d69c..238f6f9421cc9 100644 --- a/python/ray/data/_internal/execution/streaming_executor.py +++ b/python/ray/data/_internal/execution/streaming_executor.py @@ -309,7 +309,6 @@ def _scheduling_loop_step(self, topology: Topology) -> bool: i += 1 if i % PROGRESS_BAR_UPDATE_INTERVAL == 0: self._refresh_progress_bars(topology) - op.update_resource_usage() topology[op].dispatch_next_task() self._resource_manager.update_usages() op = select_operator_to_run( diff --git a/python/ray/data/_internal/execution/streaming_executor_state.py b/python/ray/data/_internal/execution/streaming_executor_state.py index 018c50066a8d3..9f44a0f6cc7a2 100644 --- a/python/ray/data/_internal/execution/streaming_executor_state.py +++ b/python/ray/data/_internal/execution/streaming_executor_state.py @@ -256,20 +256,6 @@ def refresh_progress_bar(self, resource_manager: ResourceManager) -> None: self.progress_bar.set_description(self.summary_str(resource_manager)) self.progress_bar.refresh() - def actor_info_progress_str(self) -> str: - # Alive/Pending/Restarting actors - alive = self.op.num_alive_actors() - pending = self.op.num_pending_actors() - restarting = self.op.num_restarting_actors() - total = alive + pending + restarting - if total == alive: - return f"; Actors: {total}" - else: - return ( - f"; Actors: {total} (alive {alive}, restarting {restarting}, " - f"pending {pending})" - ) - def summary_str(self, resource_manager: ResourceManager) -> str: # Active tasks active = self.op.num_active_tasks() @@ -281,7 +267,7 @@ def summary_str(self, resource_manager: ResourceManager) -> str: desc += " [backpressured]" # Actors info - desc += self.actor_info_progress_str() + desc += self.op.actor_info_progress_str() # Queued blocks queued = self.num_queued() + self.op.internal_queue_size() diff --git a/python/ray/data/tests/test_actor_pool_map_operator.py b/python/ray/data/tests/test_actor_pool_map_operator.py index e252a2993b8f4..2e5c09d0bbebe 100644 --- a/python/ray/data/tests/test_actor_pool_map_operator.py +++ b/python/ray/data/tests/test_actor_pool_map_operator.py @@ -144,6 +144,10 @@ def test_restarting_to_alive(self): assert pool.num_active_actors() == 0 assert pool.num_idle_actors() == 1 assert pool.num_free_slots() == 1 + assert ( + pool.actor_info_progress_str() + == "; Actors: 1 (alive 0, restarting 1, pending 0)" + ) # Mark the actor as alive and test pick_actor succeeds pool.update_running_actor_state(actor, False) @@ -157,6 +161,7 @@ def test_restarting_to_alive(self): assert pool.num_active_actors() == 1 assert pool.num_idle_actors() == 0 assert pool.num_free_slots() == 0 + assert pool.actor_info_progress_str() == "; Actors: 1" # Return the actor pool.return_actor(picked_actor) @@ -168,6 +173,7 @@ def test_restarting_to_alive(self): assert pool.num_active_actors() == 0 assert pool.num_idle_actors() == 1 assert pool.num_free_slots() == 1 + assert pool.actor_info_progress_str() == "; Actors: 1" def test_repeated_picking(self): # Test that we can repeatedly pick the same actor. @@ -570,7 +576,7 @@ def test_locality_manager_busyness_ranking(self): assert res3 is None -def test_start_actor_timeout(ray_start_regular_shared, restore_data_context): +def test_start_actor_timeout(ray_start_regular, restore_data_context): """Tests that ActorPoolMapOperator raises an exception on timeout while waiting for actors.""" @@ -602,7 +608,6 @@ def __call__(self, x): def test_actor_pool_fault_tolerance_e2e(ray_start_cluster, restore_data_context): """Test that a dataset with actor pools can finish, when all nodes in the cluster are removed and added back.""" - ray.shutdown() cluster = ray_start_cluster cluster.add_node(num_cpus=0) ray.init() @@ -644,7 +649,7 @@ async def wait_for_nodes_restarted(self): signal_actor = Signal.remote() # Spin up nodes - num_nodes = 4 + num_nodes = 2 nodes = [] for _ in range(num_nodes): nodes.append(cluster.add_node(num_cpus=10, num_gpus=1)) @@ -681,6 +686,7 @@ def run_dataset(): fn_constructor_args=[signal_actor], concurrency=num_nodes, batch_size=1, + num_gpus=1, ) res = ds.take_all()