Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] fix flaky reference count test by cleaning up borrowed references if actor is killed before dependencies are not resolved #48091

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion python/ray/tests/test_reference_counting.py
Original file line number Diff line number Diff line change
Expand Up @@ -578,10 +578,11 @@ def should_not_be_run(self):
# Test with implicit cancellation by letting the actor handle go out-of-scope.
def test_implicit_cancel():
ref = ray.put(1)
Actor.remote({"foo": ref})
print(Actor.remote({"foo": ref}))

test_implicit_cancel()
# Confirm that the ref object is not leaked.

check_refcounts({})

# Test with explicit cancellation via ray.kill().
Expand Down
3 changes: 3 additions & 0 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2381,6 +2381,9 @@ Status CoreWorker::CreateActor(const RayFunction &function,
RAY_LOG(ERROR).WithField(task_spec.ActorCreationId())
<< "Failed to register actor. Error message: "
<< status.ToString();
task_manager_->FailPendingTask(task_spec.TaskId(),
rpc::ErrorType::ACTOR_CREATION_FAILED,
&status);
} else {
RAY_UNUSED(actor_task_submitter_->SubmitActorCreationTask(task_spec));
}
Expand Down
1 change: 1 addition & 0 deletions src/ray/core_worker/transport/actor_task_submitter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ Status ActorTaskSubmitter::SubmitActorCreationTask(TaskSpecification task_spec)
rpc::RayErrorInfo ray_error_info;
if (status.IsSchedulingCancelled()) {
RAY_LOG(DEBUG).WithField(actor_id) << "Actor creation cancelled";
RAY_LOG(ERROR).WithField(actor_id) << "SANG-TODO Actor creation cancelled";
task_finisher_.MarkTaskCanceled(task_id);
if (reply.has_death_cause()) {
ray_error_info.mutable_actor_died_error()->CopyFrom(reply.death_cause());
Expand Down
45 changes: 26 additions & 19 deletions src/ray/gcs/gcs_server/gcs_actor_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -393,14 +393,14 @@ void GcsActorManager::HandleRegisterActor(rpc::RegisterActorRequest request,
ActorID::FromBinary(request.task_spec().actor_creation_task_spec().actor_id());

RAY_LOG(INFO).WithField(actor_id.JobId()).WithField(actor_id) << "Registering actor";
Status status =
RegisterActor(request,
[reply, send_reply_callback, actor_id](
const std::shared_ptr<gcs::GcsActor> &actor) {
RAY_LOG(INFO) << "Registered actor, job id = " << actor_id.JobId()
<< ", actor id = " << actor_id;
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
});
Status status = RegisterActor(
request,
[reply, send_reply_callback, actor_id](const std::shared_ptr<gcs::GcsActor> &actor,
const Status &status) {
RAY_LOG(INFO) << "Registered actor, job id = " << actor_id.JobId()
<< ", actor id = " << actor_id;
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
});
if (!status.ok()) {
RAY_LOG(WARNING).WithField(actor_id.JobId()).WithField(actor_id)
<< "Failed to register actor: " << status.ToString();
Expand Down Expand Up @@ -706,11 +706,11 @@ void GcsActorManager::HandleKillActorViaGcs(rpc::KillActorViaGcsRequest request,
}

Status GcsActorManager::RegisterActor(const ray::rpc::RegisterActorRequest &request,
RegisterActorCallback success_callback) {
RegisterActorCallback register_callback) {
// NOTE: After the abnormal recovery of the network between GCS client and GCS server or
// the GCS server is restarted, it is required to continue to register actor
// successfully.
RAY_CHECK(success_callback);
RAY_CHECK(register_callback);
const auto &actor_creation_task_spec = request.task_spec().actor_creation_task_spec();
auto actor_id = ActorID::FromBinary(actor_creation_task_spec.actor_id());

Expand All @@ -721,13 +721,13 @@ Status GcsActorManager::RegisterActor(const ray::rpc::RegisterActorRequest &requ
// 1. The GCS client sends the `RegisterActor` request to the GCS server.
// 2. The GCS client receives some network errors.
// 3. The GCS client resends the `RegisterActor` request to the GCS server.
pending_register_iter->second.emplace_back(std::move(success_callback));
pending_register_iter->second.emplace_back(std::move(register_callback));
} else {
// 1. The GCS client sends the `RegisterActor` request to the GCS server.
// 2. The GCS server flushes the actor to the storage and restarts before replying
// to the GCS client.
// 3. The GCS client resends the `RegisterActor` request to the GCS server.
success_callback(iter->second);
register_callback(iter->second, Status::OK());
}
return Status::OK();
}
Expand Down Expand Up @@ -772,7 +772,7 @@ Status GcsActorManager::RegisterActor(const ray::rpc::RegisterActorRequest &requ
}
}

actor_to_register_callbacks_[actor_id].emplace_back(std::move(success_callback));
actor_to_register_callbacks_[actor_id].emplace_back(register_callback);
registered_actors_.emplace(actor->GetActorID(), actor);
function_manager_.AddJobReference(actor_id.JobId());

Expand All @@ -793,25 +793,32 @@ Status GcsActorManager::RegisterActor(const ray::rpc::RegisterActorRequest &requ

// The backend storage is supposed to be reliable, so the status must be ok.
RAY_CHECK_OK(gcs_table_storage_->ActorTaskSpecTable().Put(
actor_id, request.task_spec(), [this, actor](const Status &status) {
actor_id,
request.task_spec(),
[this, actor, register_callback](const Status &status) {
RAY_CHECK_OK(gcs_table_storage_->ActorTable().Put(
actor->GetActorID(),
*actor->GetMutableActorTableData(),
[this, actor](const Status &status) {
[this, actor, register_callback](const Status &status) {
// The backend storage is supposed to be reliable, so the status must be ok.
RAY_CHECK_OK(status);
actor->WriteActorExportEvent();
// If a creator dies before this callback is called, the actor could have
// been already destroyed. It is okay not to invoke a callback because we
// don't need to reply to the creator as it is already dead.
auto registered_actor_it = registered_actors_.find(actor->GetActorID());
auto reply_status = Status::OK();
if (registered_actor_it == registered_actors_.end()) {
// NOTE(sang): This logic assumes that the ordering of backend call is
// guaranteed. It is currently true because we use a single TCP socket to
// call the default Redis backend. If ordering is not guaranteed, we
// should overwrite the actor state to DEAD to avoid race condition.
RAY_LOG(INFO).WithField(actor->GetActorID())
<< "Actor is killed before dependency is prepared.";
RAY_CHECK(actor_to_register_callbacks_.find(actor->GetActorID()) ==
actor_to_register_callbacks_.end());
register_callback(
actor, Status::SchedulingCancelled("Actor creation cancelled."));
return;
}

RAY_CHECK_OK(gcs_publisher_->PublishActor(
actor->GetActorID(), actor->GetActorTableData(), nullptr));
// Invoke all callbacks for all registration requests of this actor
Expand All @@ -824,7 +831,7 @@ Status GcsActorManager::RegisterActor(const ray::rpc::RegisterActorRequest &requ
auto callbacks = std::move(iter->second);
actor_to_register_callbacks_.erase(iter);
for (auto &callback : callbacks) {
callback(actor);
callback(actor, Status::OK());
}
}));
}));
Expand Down
3 changes: 2 additions & 1 deletion src/ray/gcs/gcs_server/gcs_actor_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,8 @@ class GcsActor {
std::optional<rpc::ActorTableData::ActorState> last_metric_state_;
};

using RegisterActorCallback = std::function<void(std::shared_ptr<GcsActor>)>;
using RegisterActorCallback =
std::function<void(std::shared_ptr<GcsActor>, const Status &status)>;
using RestartActorCallback = std::function<void(std::shared_ptr<GcsActor>)>;
using CreateActorCallback = std::function<void(
std::shared_ptr<GcsActor>, const rpc::PushTaskReply &reply, const Status &status)>;
Expand Down