Skip to content

Commit

Permalink
Addressed review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Srinath Krishnamachari <[email protected]>
  • Loading branch information
srinathk10 committed Oct 18, 2024
1 parent a5b2d55 commit 1b506b5
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -399,27 +399,6 @@ def num_active_tasks(self) -> int:
"""
return len(self.get_active_tasks())

def num_alive_actors(self) -> int:
"""Return the number of alive actors.
This method is used to display alive actor info in the progress bar.
"""
return 0

def num_pending_actors(self) -> int:
"""Return the number of pending actors.
This method is used to display pending actor info in the progress bar.
"""
return 0

def num_restarting_actors(self) -> int:
"""Return the number of restarting actors.
This method is used to display restarting actor info in the progress bar.
"""
return 0

def throttling_disabled(self) -> bool:
"""Whether to disable resource throttling for this operator.
Expand Down Expand Up @@ -539,3 +518,12 @@ def update_resource_usage(self) -> None:
restarting actors, retrying tasks, lost objects, etc.
"""
pass

def actor_info_progress_str(self) -> str:
"""Returns Actor progress strings for Alive, Restarting and Pending Actors.
This method will be called in summary_str API in OpState. Subcallses can
override it to return Actor progress strings for Alive, Restarting and Pending
Actors.
"""
return ""
Original file line number Diff line number Diff line change
Expand Up @@ -305,27 +305,6 @@ def pending_processor_usage(self) -> ExecutionResources:
gpu=self._ray_remote_args.get("num_gpus", 0) * num_pending_workers,
)

def num_alive_actors(self) -> int:
"""Return the number of alive actors.
This method is used to display alive actor info in the progress bar.
"""
return self._actor_pool.num_alive_actors()

def num_pending_actors(self) -> int:
"""Return the number of pending actors.
This method is used to display pending actor info in the progress bar.
"""
return self._actor_pool.num_pending_actors()

def num_restarting_actors(self) -> int:
"""Return the number of restarting actors.
This method is used to display restarting actor info in the progress bar.
"""
return self._actor_pool.num_restarting_actors()

def incremental_resource_usage(self) -> ExecutionResources:
# Submitting tasks to existing actors doesn't require additional
# CPU/GPU resources.
Expand Down Expand Up @@ -382,6 +361,10 @@ def update_resource_usage(self) -> None:
else:
self._actor_pool.update_running_actor_state(actor, False)

def actor_info_progress_str(self) -> str:
"""Returns Actor progress strings for Alive, Restarting and Pending Actors."""
return self._actor_pool.actor_info_progress_str()


class _MapWorker:
"""An actor worker for MapOperator."""
Expand Down Expand Up @@ -756,3 +739,17 @@ def _get_location(self, bundle: RefBundle) -> Optional[NodeIdStr]:
A node id associated with the bundle, or None if unknown.
"""
return bundle.get_cached_location()

def actor_info_progress_str(self) -> str:
"""Returns Actor progress strings for Alive, Restarting and Pending Actors."""
alive = self.num_alive_actors()
pending = self.num_pending_actors()
restarting = self.num_restarting_actors()
total = alive + pending + restarting
if total == alive:
return f"; Actors: {total}"
else:
return (
f"; Actors: {total} (alive {alive}, restarting {restarting}, "
f"pending {pending})"
)
1 change: 1 addition & 0 deletions python/ray/data/_internal/execution/resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ def update_usages(self):
for op, state in reversed(self._topology.items()):
# Update `self._op_usages`, `self._op_running_usages`,
# and `self._op_pending_usages`.
op.update_resource_usage()
op_usage = op.current_processor_usage()
op_running_usage = op.running_processor_usage()
op_pending_usage = op.pending_processor_usage()
Expand Down
1 change: 0 additions & 1 deletion python/ray/data/_internal/execution/streaming_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,6 @@ def _scheduling_loop_step(self, topology: Topology) -> bool:
i += 1
if i % PROGRESS_BAR_UPDATE_INTERVAL == 0:
self._refresh_progress_bars(topology)
op.update_resource_usage()
topology[op].dispatch_next_task()
self._resource_manager.update_usages()
op = select_operator_to_run(
Expand Down
16 changes: 1 addition & 15 deletions python/ray/data/_internal/execution/streaming_executor_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,20 +256,6 @@ def refresh_progress_bar(self, resource_manager: ResourceManager) -> None:
self.progress_bar.set_description(self.summary_str(resource_manager))
self.progress_bar.refresh()

def actor_info_progress_str(self) -> str:
# Alive/Pending/Restarting actors
alive = self.op.num_alive_actors()
pending = self.op.num_pending_actors()
restarting = self.op.num_restarting_actors()
total = alive + pending + restarting
if total == alive:
return f"; Actors: {total}"
else:
return (
f"; Actors: {total} (alive {alive}, restarting {restarting}, "
f"pending {pending})"
)

def summary_str(self, resource_manager: ResourceManager) -> str:
# Active tasks
active = self.op.num_active_tasks()
Expand All @@ -281,7 +267,7 @@ def summary_str(self, resource_manager: ResourceManager) -> str:
desc += " [backpressured]"

# Actors info
desc += self.actor_info_progress_str()
desc += self.op.actor_info_progress_str()

# Queued blocks
queued = self.num_queued() + self.op.internal_queue_size()
Expand Down
12 changes: 9 additions & 3 deletions python/ray/data/tests/test_actor_pool_map_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ def test_restarting_to_alive(self):
assert pool.num_active_actors() == 0
assert pool.num_idle_actors() == 1
assert pool.num_free_slots() == 1
assert (
pool.actor_info_progress_str()
== "; Actors: 1 (alive 0, restarting 1, pending 0)"
)

# Mark the actor as alive and test pick_actor succeeds
pool.update_running_actor_state(actor, False)
Expand All @@ -157,6 +161,7 @@ def test_restarting_to_alive(self):
assert pool.num_active_actors() == 1
assert pool.num_idle_actors() == 0
assert pool.num_free_slots() == 0
assert pool.actor_info_progress_str() == "; Actors: 1"

# Return the actor
pool.return_actor(picked_actor)
Expand All @@ -168,6 +173,7 @@ def test_restarting_to_alive(self):
assert pool.num_active_actors() == 0
assert pool.num_idle_actors() == 1
assert pool.num_free_slots() == 1
assert pool.actor_info_progress_str() == "; Actors: 1"

def test_repeated_picking(self):
# Test that we can repeatedly pick the same actor.
Expand Down Expand Up @@ -570,7 +576,7 @@ def test_locality_manager_busyness_ranking(self):
assert res3 is None


def test_start_actor_timeout(ray_start_regular_shared, restore_data_context):
def test_start_actor_timeout(ray_start_regular, restore_data_context):
"""Tests that ActorPoolMapOperator raises an exception on
timeout while waiting for actors."""

Expand Down Expand Up @@ -602,7 +608,6 @@ def __call__(self, x):
def test_actor_pool_fault_tolerance_e2e(ray_start_cluster, restore_data_context):
"""Test that a dataset with actor pools can finish, when
all nodes in the cluster are removed and added back."""
ray.shutdown()
cluster = ray_start_cluster
cluster.add_node(num_cpus=0)
ray.init()
Expand Down Expand Up @@ -644,7 +649,7 @@ async def wait_for_nodes_restarted(self):
signal_actor = Signal.remote()

# Spin up nodes
num_nodes = 4
num_nodes = 2
nodes = []
for _ in range(num_nodes):
nodes.append(cluster.add_node(num_cpus=10, num_gpus=1))
Expand Down Expand Up @@ -681,6 +686,7 @@ def run_dataset():
fn_constructor_args=[signal_actor],
concurrency=num_nodes,
batch_size=1,
num_gpus=1,
)
res = ds.take_all()

Expand Down

0 comments on commit 1b506b5

Please sign in to comment.