Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mark restarting actors are pending actors #47946

Open
wants to merge 20 commits into
base: master
Choose a base branch
from

Conversation

srinathk10
Copy link

@srinathk10 srinathk10 commented Oct 8, 2024

Why are these changes needed?

In ActorPoolMapOperator that executes tasks on Actor pool, to schedule an incoming task, pick_actor is invoked. The pick_actor is a simple bin packing algo to pick a running Actor with least inflight tasks. When Actor restarts though, pick_actor needs to exclude it from task scheduling.

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@srinathk10 srinathk10 force-pushed the srinathk10-restart-actor-mark-pending branch 5 times, most recently from 8e4578a to 766db7d Compare October 10, 2024 03:42
in self._actor_pool._restarting_actors
)
# Move the actor from restarting to running state.
self._actor_pool.restarting_to_running(actor_to_return)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to return the actor in this case as well. Otherwise, the actor will be no longer usable.
BTW, let's add a unit test in test_actor_pool_map_operator.py to cover this case.

@@ -221,7 +231,7 @@ def _task_done_callback(actor_to_return):
self._submit_data_task(
gen,
bundle,
lambda: _task_done_callback(actor_to_return),
lambda: _task_done_callback(actor_to_return), # noqa: B023
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this change seems unrelated?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, was suppressing a warning with ./scripts/format.sh

actors = list(self._actor_pool._num_tasks_in_flight.keys())
for actor in actors:
actor_state = actor._get_local_state()
if actor_state != gcs_pb2.ActorTableData.ActorState.ALIVE:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add an assertion here to make sure we are only handling RESTARTING state here.

if actor_state != gcs_pb2.ActorTableData.ActorState.ALIVE:
# If an actor is not ALIVE, it's a candidate to be marked as a
# restarting actor.
self._actor_pool.running_to_restarting(actor, actor.get_location)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self._actor_pool.running_to_restarting(actor, actor.get_location)
if self._actor_pool.is_actor_running(actor):
self._actor_pool.running_to_restarting(actor, actor.get_location.remote())
  • Moving the running check here would be cleaner. And more importantly, we should only send get_location when the actor switched from running to restarting.
  • .remote() was missed after get_location. It didn't error out probably because actor locality is disabled by default right now.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok. get_location is valid only when state is ALIVE

else:
# If an actor is ALIVE, it's a candidate to be marked as a
# running actor, if not already the case.
self._actor_pool.restarting_to_running(actor.get_location)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should use the actor handle as the key, actor.get_location is a method, not an object ref.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the above comment, let's add some unit test to cover the state transitions.

from ray.tests.conftest import * # noqa


def test_removed_nodes_and_added_back(ray_start_cluster):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • let's also test pending_processor_usage reports the correct usages during different stages.
  • maybe just move this test to test_actor_pool_map_operator.py.

# The actor has been removed from the pool before becoming running.
return False
actor = self._restarting_actors.pop(ready_ref)
self._num_tasks_in_flight[actor] = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to keep the old _num_tasks_in_flights and restore it here

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see, makes sense

return True

# Next prioritize killing restarting actor.
killed = self._maybe_kill_restarting_actor()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's only keep restarting actors with in_flight_tasks = 0 here.

@srinathk10 srinathk10 force-pushed the srinathk10-restart-actor-mark-pending branch 2 times, most recently from 6c0de82 to 0c6ee88 Compare October 11, 2024 19:46
@srinathk10 srinathk10 marked this pull request as ready for review October 11, 2024 20:24
@srinathk10 srinathk10 force-pushed the srinathk10-restart-actor-mark-pending branch 4 times, most recently from 766ba8c to 3969db2 Compare October 15, 2024 19:48
Copy link
Contributor

@raulchen raulchen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

High-level structure looks good. Left some comments

else:
# If an actor is ALIVE, it's a candidate to be marked as a
# running actor, if not already the case.
self._actor_pool.clear_restarting_from_running_actor(actor)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, the method names sound a bit too verbose. maybe just mark_actor_as_alive/restarting?

@@ -309,6 +309,7 @@ def _scheduling_loop_step(self, topology: Topology) -> bool:
i += 1
if i % PROGRESS_BAR_UPDATE_INTERVAL == 0:
self._refresh_progress_bars(topology)
topology[op].update_resource_usage()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just op.update_resource_usage() so we don't need the extra indirection in OpState

self._num_tasks_in_flight[actor] -= 1
if self._should_kill_idle_actors and self._num_tasks_in_flight[actor] == 0:
# Mark restarting as false, now that the actor in running
self._running_actors[actor]._is_restarting = False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after a second thought, I think it'd be slightly more clear to remove this and let the next update_resource_usage to handle the state transition

python/ray/data/tests/test_actor_pool_map_operator.py Outdated Show resolved Hide resolved
python/ray/data/tests/test_actor_pool_map_operator.py Outdated Show resolved Hide resolved
python/ray/data/tests/test_actor_pool_map_operator.py Outdated Show resolved Hide resolved
python/ray/data/tests/test_actor_pool_map_operator.py Outdated Show resolved Hide resolved
@srinathk10 srinathk10 force-pushed the srinathk10-restart-actor-mark-pending branch from 4c6b6c6 to 1f6474f Compare October 16, 2024 04:48
Comment on lines 494 to 486
def num_alive_actors(self) -> int:
return sum(
1
if (
running_actor_state.num_tasks_in_flight > 0
and running_actor_state.is_restarting is False
)
else 0
for running_actor_state in self._running_actors.values()
)
Copy link
Member

@bveeramani bveeramani Oct 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean we don't count an actor as alive if it doesn't have any tasks in flight? If so, what're the implications of that (if any)?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For scheduling tasks, we invoke pick_actors(). Earlier, it did not cover for the restarting case, but now it excludes actors restarting even with low in flight tasks.

Also resource accounting APIs current_processor_usage() and pending_processor_usage() account for restarting actors.

Comment on lines 366 to 367
actor_state = actor._get_local_state()
if actor_state != gcs_pb2.ActorTableData.ActorState.ALIVE:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if _get_local_state returns None? Looks like we assume that the actor is restarting -- do we need to worry about this edge case?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If raylet.pyx:4371 get_local_actor_state() returns None, then actor.py:1561 _get_local_state() can return None.

I think it's defensive to check for None here, given I am not sure about the interface guarantee for get_local_actor_state().

Good catch! Let me fix the code.


def update_resource_usage(self) -> None:
"""Updates resources usage."""
for actor in self._actor_pool._running_actors.keys():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Should we add a method to _ActorPool that provides a list of actor handles so that we don't access the internal _running_actors attribute?

Comment on lines 344 to 347
def update_resource_usage(self) -> None:
"""Updates resources usage."""
self.op.update_resource_usage()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is OpState.update_resource_usage called?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is invoked by _scheduling_loop_step in streaming_executor.py. Will add a comment here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that calls PhysicalOperator.update_resource_usage and not OpState.update_resource_usage? AFAIK OpState.update_resource_usage doesn't have any references

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, nevermind, saw you removed OpState.update_resource_usage

python/ray/data/tests/test_actor_pool_map_operator.py Outdated Show resolved Hide resolved
self._running_actors[actor].num_tasks_in_flight >= self._max_tasks_in_flight
or self._running_actors[actor].is_restarting
):
# All actors are at capacity or restarting.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, not a new issue of this PR. but I think it'd be more clear if we filter the running actors by validness and then find the min.

python/ray/data/tests/test_executor_resource_management.py Outdated Show resolved Hide resolved
python/ray/data/tests/test_actor_pool_map_operator.py Outdated Show resolved Hide resolved
python/ray/data/tests/test_actor_pool_map_operator.py Outdated Show resolved Hide resolved
python/ray/data/tests/test_actor_pool_map_operator.py Outdated Show resolved Hide resolved
python/ray/data/tests/test_actor_pool_map_operator.py Outdated Show resolved Hide resolved
@anyscalesam anyscalesam added triage Needs triage (eg: priority, bug/not-bug, and owning component) core Issues that should be addressed in Ray Core labels Oct 16, 2024
@srinathk10 srinathk10 force-pushed the srinathk10-restart-actor-mark-pending branch from 1085e80 to caae0a5 Compare October 17, 2024 05:26
def test_actor_pool_fault_tolerance_e2e(ray_start_cluster):
"""Test that a dataset with actor pools can finish, when
all nodes in the cluster are removed and added back."""
ray.shutdown()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this shutdown shouldn't be needed

Copy link
Author

@srinathk10 srinathk10 Oct 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without this shutdown, ray.init() line 608 is throwing.

E RuntimeError: Maybe you called ray.init twice by accident? This error can be suppressed by passing in 'ignore_reinit_error=True' or by calling 'ray.shutdown()' prior to 'ray.init()'.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's because the previously test didn't shutdown the cluster. We can change the ray_start_regular_shared to ray_start_regular

python/ray/data/tests/test_actor_pool_map_operator.py Outdated Show resolved Hide resolved
python/ray/data/tests/test_actor_pool_map_operator.py Outdated Show resolved Hide resolved
Copy link
Contributor

@raulchen raulchen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. a few final small comments

def test_actor_pool_fault_tolerance_e2e(ray_start_cluster):
"""Test that a dataset with actor pools can finish, when
all nodes in the cluster are removed and added back."""
ray.shutdown()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's because the previously test didn't shutdown the cluster. We can change the ray_start_regular_shared to ray_start_regular

actor_str += f", (pending: {pending})"
desc += actor_str
# Actors info
desc += self.actor_info_progress_str()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, sorry, I meant just adding this actor_info_progress_str in PhysicalOperator and get rid of the num_xxx_actors methods. Because it seems a bit overkill to have so many methods and indirections

@@ -309,6 +309,7 @@ def _scheduling_loop_step(self, topology: Topology) -> bool:
i += 1
if i % PROGRESS_BAR_UPDATE_INTERVAL == 0:
self._refresh_progress_bars(topology)
op.update_resource_usage()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's move this call inside ResourceManager.update_usages() here
Because the updated info will be used in that function

@srinathk10 srinathk10 force-pushed the srinathk10-restart-actor-mark-pending branch from 59602a9 to 86954bf Compare October 18, 2024 20:28
@srinathk10 srinathk10 force-pushed the srinathk10-restart-actor-mark-pending branch from 86954bf to a2aac45 Compare October 18, 2024 21:02
srinathk10 and others added 19 commits October 18, 2024 14:05
Signed-off-by: Srinath Krishnamachari <[email protected]>
Signed-off-by: Srinath Krishnamachari <[email protected]>
This reverts commit ad9070d.

Signed-off-by: Srinath Krishnamachari <[email protected]>
Signed-off-by: Srinath Krishnamachari <[email protected]>
Signed-off-by: Srinath Krishnamachari <[email protected]>
…perator.py

Co-authored-by: Balaji Veeramani <[email protected]>
Signed-off-by: srinathk10 <[email protected]>
…perator.py

Co-authored-by: Balaji Veeramani <[email protected]>
Signed-off-by: srinathk10 <[email protected]>
…perator.py

Co-authored-by: Balaji Veeramani <[email protected]>
Signed-off-by: srinathk10 <[email protected]>
Signed-off-by: Srinath Krishnamachari <[email protected]>
Signed-off-by: Srinath Krishnamachari <[email protected]>
Signed-off-by: Srinath Krishnamachari <[email protected]>
Signed-off-by: Srinath Krishnamachari <[email protected]>
@srinathk10 srinathk10 force-pushed the srinathk10-restart-actor-mark-pending branch from a2aac45 to 1b506b5 Compare October 18, 2024 21:05
Signed-off-by: Srinath Krishnamachari <[email protected]>
@srinathk10 srinathk10 force-pushed the srinathk10-restart-actor-mark-pending branch from 5e3fb9a to 217ca5b Compare October 18, 2024 23:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Issues that should be addressed in Ray Core triage Needs triage (eg: priority, bug/not-bug, and owning component)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants