From fa59ab4e0198b6fa28d0ec756ff5e3bf4212b125 Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Thu, 17 Oct 2024 13:47:44 -0700 Subject: [PATCH] fix flaky reference count test --- python/ray/tests/test_reference_counting.py | 3 +- src/ray/core_worker/core_worker.cc | 3 ++ .../transport/actor_task_submitter.cc | 1 + src/ray/gcs/gcs_server/gcs_actor_manager.cc | 45 +++++++++++-------- src/ray/gcs/gcs_server/gcs_actor_manager.h | 3 +- 5 files changed, 34 insertions(+), 21 deletions(-) diff --git a/python/ray/tests/test_reference_counting.py b/python/ray/tests/test_reference_counting.py index 0bd91460fe343..d5b20116e58d9 100644 --- a/python/ray/tests/test_reference_counting.py +++ b/python/ray/tests/test_reference_counting.py @@ -578,10 +578,11 @@ def should_not_be_run(self): # Test with implicit cancellation by letting the actor handle go out-of-scope. def test_implicit_cancel(): ref = ray.put(1) - Actor.remote({"foo": ref}) + print(Actor.remote({"foo": ref})) test_implicit_cancel() # Confirm that the ref object is not leaked. + check_refcounts({}) # Test with explicit cancellation via ray.kill(). diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 7ba53bf747972..068f9d769f446 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -2381,6 +2381,9 @@ Status CoreWorker::CreateActor(const RayFunction &function, RAY_LOG(ERROR).WithField(task_spec.ActorCreationId()) << "Failed to register actor. Error message: " << status.ToString(); + task_manager_->FailPendingTask(task_spec.TaskId(), + rpc::ErrorType::ACTOR_CREATION_FAILED, + &status); } else { RAY_UNUSED(actor_task_submitter_->SubmitActorCreationTask(task_spec)); } diff --git a/src/ray/core_worker/transport/actor_task_submitter.cc b/src/ray/core_worker/transport/actor_task_submitter.cc index babd1ba8dc6db..fe666c382f99f 100644 --- a/src/ray/core_worker/transport/actor_task_submitter.cc +++ b/src/ray/core_worker/transport/actor_task_submitter.cc @@ -138,6 +138,7 @@ Status ActorTaskSubmitter::SubmitActorCreationTask(TaskSpecification task_spec) rpc::RayErrorInfo ray_error_info; if (status.IsSchedulingCancelled()) { RAY_LOG(DEBUG).WithField(actor_id) << "Actor creation cancelled"; + RAY_LOG(ERROR).WithField(actor_id) << "SANG-TODO Actor creation cancelled"; task_finisher_.MarkTaskCanceled(task_id); if (reply.has_death_cause()) { ray_error_info.mutable_actor_died_error()->CopyFrom(reply.death_cause()); diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.cc b/src/ray/gcs/gcs_server/gcs_actor_manager.cc index 0c30514c1e32b..be80b0a5c35ea 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.cc @@ -393,14 +393,14 @@ void GcsActorManager::HandleRegisterActor(rpc::RegisterActorRequest request, ActorID::FromBinary(request.task_spec().actor_creation_task_spec().actor_id()); RAY_LOG(INFO).WithField(actor_id.JobId()).WithField(actor_id) << "Registering actor"; - Status status = - RegisterActor(request, - [reply, send_reply_callback, actor_id]( - const std::shared_ptr &actor) { - RAY_LOG(INFO) << "Registered actor, job id = " << actor_id.JobId() - << ", actor id = " << actor_id; - GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); - }); + Status status = RegisterActor( + request, + [reply, send_reply_callback, actor_id](const std::shared_ptr &actor, + const Status &status) { + RAY_LOG(INFO) << "Registered actor, job id = " << actor_id.JobId() + << ", actor id = " << actor_id; + GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); + }); if (!status.ok()) { RAY_LOG(WARNING).WithField(actor_id.JobId()).WithField(actor_id) << "Failed to register actor: " << status.ToString(); @@ -706,11 +706,11 @@ void GcsActorManager::HandleKillActorViaGcs(rpc::KillActorViaGcsRequest request, } Status GcsActorManager::RegisterActor(const ray::rpc::RegisterActorRequest &request, - RegisterActorCallback success_callback) { + RegisterActorCallback register_callback) { // NOTE: After the abnormal recovery of the network between GCS client and GCS server or // the GCS server is restarted, it is required to continue to register actor // successfully. - RAY_CHECK(success_callback); + RAY_CHECK(register_callback); const auto &actor_creation_task_spec = request.task_spec().actor_creation_task_spec(); auto actor_id = ActorID::FromBinary(actor_creation_task_spec.actor_id()); @@ -721,13 +721,13 @@ Status GcsActorManager::RegisterActor(const ray::rpc::RegisterActorRequest &requ // 1. The GCS client sends the `RegisterActor` request to the GCS server. // 2. The GCS client receives some network errors. // 3. The GCS client resends the `RegisterActor` request to the GCS server. - pending_register_iter->second.emplace_back(std::move(success_callback)); + pending_register_iter->second.emplace_back(std::move(register_callback)); } else { // 1. The GCS client sends the `RegisterActor` request to the GCS server. // 2. The GCS server flushes the actor to the storage and restarts before replying // to the GCS client. // 3. The GCS client resends the `RegisterActor` request to the GCS server. - success_callback(iter->second); + register_callback(iter->second, Status::OK()); } return Status::OK(); } @@ -772,7 +772,7 @@ Status GcsActorManager::RegisterActor(const ray::rpc::RegisterActorRequest &requ } } - actor_to_register_callbacks_[actor_id].emplace_back(std::move(success_callback)); + actor_to_register_callbacks_[actor_id].emplace_back(register_callback); registered_actors_.emplace(actor->GetActorID(), actor); function_manager_.AddJobReference(actor_id.JobId()); @@ -793,25 +793,32 @@ Status GcsActorManager::RegisterActor(const ray::rpc::RegisterActorRequest &requ // The backend storage is supposed to be reliable, so the status must be ok. RAY_CHECK_OK(gcs_table_storage_->ActorTaskSpecTable().Put( - actor_id, request.task_spec(), [this, actor](const Status &status) { + actor_id, + request.task_spec(), + [this, actor, register_callback](const Status &status) { RAY_CHECK_OK(gcs_table_storage_->ActorTable().Put( actor->GetActorID(), *actor->GetMutableActorTableData(), - [this, actor](const Status &status) { + [this, actor, register_callback](const Status &status) { // The backend storage is supposed to be reliable, so the status must be ok. RAY_CHECK_OK(status); actor->WriteActorExportEvent(); - // If a creator dies before this callback is called, the actor could have - // been already destroyed. It is okay not to invoke a callback because we - // don't need to reply to the creator as it is already dead. auto registered_actor_it = registered_actors_.find(actor->GetActorID()); + auto reply_status = Status::OK(); if (registered_actor_it == registered_actors_.end()) { // NOTE(sang): This logic assumes that the ordering of backend call is // guaranteed. It is currently true because we use a single TCP socket to // call the default Redis backend. If ordering is not guaranteed, we // should overwrite the actor state to DEAD to avoid race condition. + RAY_LOG(INFO).WithField(actor->GetActorID()) + << "Actor is killed before dependency is prepared."; + RAY_CHECK(actor_to_register_callbacks_.find(actor->GetActorID()) == + actor_to_register_callbacks_.end()); + register_callback( + actor, Status::SchedulingCancelled("Actor creation cancelled.")); return; } + RAY_CHECK_OK(gcs_publisher_->PublishActor( actor->GetActorID(), actor->GetActorTableData(), nullptr)); // Invoke all callbacks for all registration requests of this actor @@ -824,7 +831,7 @@ Status GcsActorManager::RegisterActor(const ray::rpc::RegisterActorRequest &requ auto callbacks = std::move(iter->second); actor_to_register_callbacks_.erase(iter); for (auto &callback : callbacks) { - callback(actor); + callback(actor, Status::OK()); } })); })); diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.h b/src/ray/gcs/gcs_server/gcs_actor_manager.h index c05ba9ebd0dcb..b58e50ecf950c 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.h +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.h @@ -254,7 +254,8 @@ class GcsActor { std::optional last_metric_state_; }; -using RegisterActorCallback = std::function)>; +using RegisterActorCallback = + std::function, const Status &status)>; using RestartActorCallback = std::function)>; using CreateActorCallback = std::function, const rpc::PushTaskReply &reply, const Status &status)>;