Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Add metrics for Task RSS HWM. #48052

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions python/ray/tests/test_metrics_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,31 @@ def verify_mem_cleaned():
wait_for_condition(verify_mem_cleaned, timeout=30)


@pytest.mark.skipif(sys.platform != "linux", reason="Only works in Linux.")
def test_task_rss_hwm_kb(shutdown_only):
addr = ray.init(num_cpus=2)

@ray.remote
def fib(i):
if i <= 2:
return 1
return sum(ray.get([fib.remote(i - 1), fib.remote(i - 2)]))

assert ray.get(fib.remote(8)) == 21

def verify_components():
metrics = raw_metrics(addr)
for name, samples in metrics.items():
# Histogram, _count, _sum
if name.startswith("ray_task_rss_hwm_kb"):
for sample in samples:
assert sample.labels["Name"] == "fib"
return True
return False

wait_for_condition(verify_components, timeout=30)


def test_prometheus_file_based_service_discovery(ray_start_cluster):
# Make sure Prometheus service discovery file is correctly written
# when number of nodes are dynamically changed.
Expand Down
94 changes: 94 additions & 0 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

#include <google/protobuf/util/json_util.h>

#include <fstream>

#include "absl/cleanup/cleanup.h"
#include "absl/strings/str_format.h"
#include "boost/fiber/all.hpp"
Expand Down Expand Up @@ -86,6 +88,96 @@ class ScopedTaskMetricSetter {
bool is_retry_;
};

#ifdef __linux__
// If feasible and needed, record RSS HWM on a task.
// This class is inited before the task starts and destructed after the task finishes.
class ScopedTaskRssHwmTracker {
public:
// `spec` must outlive this object.
explicit ScopedTaskRssHwmTracker(const TaskSpecification &spec)
: enabled_(spec.IsNormalTask()), spec_(spec) {
if (enabled_) {
ClearVmHWM();
}
}

~ScopedTaskRssHwmTracker() {
if (enabled_) {
size_t hwm_kb = ReadVmHWMkB();
if (hwm_kb > 0) {
stats::STATS_task_rss_hwm_kb.Record(
hwm_kb,
{{"Name", spec_.FunctionDescriptor()->CallString()},
{"JobId", spec_.JobId().Hex()}});
}
}
}

private:
// If you can't clear the number, the number is not reliable and we don't record.
void ClearVmHWM() {
std::ofstream clear_refs_file(kSelfClearRefsFile.data());
if (!clear_refs_file.is_open()) {
RAY_LOG(DEBUG) << "Failed to open " << kSelfClearRefsFile << " to clear VmHWM";
enabled_ = false;
return;
}
clear_refs_file << kSelfClearRefsValue;
clear_refs_file.close();
RAY_LOG(DEBUG) << "Cleared VmHWM";
}

// Read VmHWM in kB from /proc/self/status. If failed, return 0.
static size_t ReadVmHWMkB() {
// ifstream::ctor does not accept string_view. Pity.
std::ifstream status_file(kSelfStatusFile.data());
if (!status_file.is_open()) {
RAY_LOG(DEBUG) << "Failed to open " << kSelfStatusFile << " to read VmHWM";
return 0;
}
std::string line;
while (std::getline(status_file, line)) {
// Example line: VmHWM: 1234 kB
if (line.compare(0, kVmHWM.size(), kVmHWM) == 0) {
size_t hwm = 0;
std::string unit;
std::istringstream iss(line.substr(kVmHWM.size()));
iss >> hwm >> unit;
if (unit != "kB") {
RAY_LOG(DEBUG) << "Unexpected VmHWM format: " << line
<< ", not recording VmHWM for the task.";
return 0;
}
return hwm;
}
}
status_file.close();
RAY_LOG(DEBUG) << "Failed to find VmHWM in " << kSelfStatusFile;
return 0;
}

bool enabled_;
const TaskSpecification &spec_;

constexpr static std::string_view kSelfStatusFile = "/proc/self/status";
constexpr static std::string_view kVmHWM = "VmHWM:";
// Write 5 to /proc/self/clear_refs to drop previous VmHWM value. This is needed
// because the worker may have served other tasks and gained a high VmHWM value.
// https://man7.org/linux/man-pages/man5/proc_pid_clear_refs.5.html
constexpr static std::string_view kSelfClearRefsFile = "/proc/self/clear_refs";
constexpr static std::string_view kSelfClearRefsValue = "5";
};

#else

// No-op in MacOS or Windows.
class ScopedTaskRssHwmTracker {
public:
explicit ScopedTaskRssHwmTracker(const TaskSpecification &spec) {}
};

#endif

using ActorLifetime = ray::rpc::JobConfig_ActorLifetime;

// Helper function converts GetObjectLocationsOwnerReply to ObjectLocation
Expand Down Expand Up @@ -3019,6 +3111,8 @@ Status CoreWorker::ExecuteTask(
task_type = TaskType::ACTOR_TASK;
}

ScopedTaskRssHwmTracker rss_hwm_tracker(task_spec);

std::shared_ptr<LocalMemoryBuffer> creation_task_exception_pb_bytes = nullptr;

std::vector<ConcurrencyGroup> defined_concurrency_groups = {};
Expand Down
21 changes: 21 additions & 0 deletions src/ray/stats/metric_defs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,27 @@ DEFINE_stats(
(),
ray::stats::GAUGE);

/// Only recorded on Linux > 4.0 and with CONFIG_PROC_PAGE_MONITOR set.
/// Only recorded on normal tasks, not actors.
DEFINE_stats(task_rss_hwm_kb,
"Memory High Watermark of resident memory of finished tasks.",
("Name", "JobId"),
// From 64MB to 128GB. An empty Python worker is typically ~90MB.
(1024.0 * 64, // 64 MB
1024.0 * 128, // 128 MB
1024.0 * 256, // 256 MB
1024.0 * 512, // 512 MB
1024.0 * 1024, // 1 GB
1024.0 * 1024 * 2, // 2 GB
1024.0 * 1024 * 4, // 4 GB
1024.0 * 1024 * 8, // 8 GB
1024.0 * 1024 * 16, // 16 GB
1024.0 * 1024 * 32, // 32 GB
1024.0 * 1024 * 64, // 64 GB
1024.0 * 1024 * 128, // 128 GB
),
ray::stats::HISTOGRAM);

/// Tracks actors by state, including pending, running, and idle actors.
///
/// To avoid metric collection conflicts between components reporting on the same task,
Expand Down
2 changes: 2 additions & 0 deletions src/ray/stats/metric_defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ namespace stats {

/// Tasks stats, broken down by state.
DECLARE_stats(tasks);
/// Memory High Watermark of resident memory of tasks.
DECLARE_stats(task_rss_hwm_kb);

/// Actor stats, broken down by state.
DECLARE_stats(actors);
Expand Down