diff --git a/python/ray/tests/test_metrics_agent.py b/python/ray/tests/test_metrics_agent.py index 990e917921b75..8929a0cc894f6 100644 --- a/python/ray/tests/test_metrics_agent.py +++ b/python/ray/tests/test_metrics_agent.py @@ -749,6 +749,31 @@ def verify_mem_cleaned(): wait_for_condition(verify_mem_cleaned, timeout=30) +@pytest.mark.skipif(sys.platform != "linux", reason="Only works in Linux.") +def test_task_rss_hwm_kb(shutdown_only): + addr = ray.init(num_cpus=2) + + @ray.remote + def fib(i): + if i <= 2: + return 1 + return sum(ray.get([fib.remote(i - 1), fib.remote(i - 2)])) + + assert ray.get(fib.remote(8)) == 21 + + def verify_components(): + metrics = raw_metrics(addr) + for name, samples in metrics.items(): + # Histogram, _count, _sum + if name.startswith("ray_task_rss_hwm_kb"): + for sample in samples: + assert sample.labels["Name"] == "fib" + return True + return False + + wait_for_condition(verify_components, timeout=30) + + def test_prometheus_file_based_service_discovery(ray_start_cluster): # Make sure Prometheus service discovery file is correctly written # when number of nodes are dynamically changed. diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 7ba53bf747972..df2bb5bbdd3fb 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -20,6 +20,8 @@ #include +#include + #include "absl/cleanup/cleanup.h" #include "absl/strings/str_format.h" #include "boost/fiber/all.hpp" @@ -86,6 +88,96 @@ class ScopedTaskMetricSetter { bool is_retry_; }; +#ifdef __linux__ +// If feasible and needed, record RSS HWM on a task. +// This class is inited before the task starts and destructed after the task finishes. +class ScopedTaskRssHwmTracker { + public: + // `spec` must outlive this object. + explicit ScopedTaskRssHwmTracker(const TaskSpecification &spec) + : enabled_(spec.IsNormalTask()), spec_(spec) { + if (enabled_) { + ClearVmHWM(); + } + } + + ~ScopedTaskRssHwmTracker() { + if (enabled_) { + size_t hwm_kb = ReadVmHWMkB(); + if (hwm_kb > 0) { + stats::STATS_task_rss_hwm_kb.Record( + hwm_kb, + {{"Name", spec_.FunctionDescriptor()->CallString()}, + {"JobId", spec_.JobId().Hex()}}); + } + } + } + + private: + // If you can't clear the number, the number is not reliable and we don't record. + void ClearVmHWM() { + std::ofstream clear_refs_file(kSelfClearRefsFile.data()); + if (!clear_refs_file.is_open()) { + RAY_LOG(DEBUG) << "Failed to open " << kSelfClearRefsFile << " to clear VmHWM"; + enabled_ = false; + return; + } + clear_refs_file << kSelfClearRefsValue; + clear_refs_file.close(); + RAY_LOG(DEBUG) << "Cleared VmHWM"; + } + + // Read VmHWM in kB from /proc/self/status. If failed, return 0. + static size_t ReadVmHWMkB() { + // ifstream::ctor does not accept string_view. Pity. + std::ifstream status_file(kSelfStatusFile.data()); + if (!status_file.is_open()) { + RAY_LOG(DEBUG) << "Failed to open " << kSelfStatusFile << " to read VmHWM"; + return 0; + } + std::string line; + while (std::getline(status_file, line)) { + // Example line: VmHWM: 1234 kB + if (line.compare(0, kVmHWM.size(), kVmHWM) == 0) { + size_t hwm = 0; + std::string unit; + std::istringstream iss(line.substr(kVmHWM.size())); + iss >> hwm >> unit; + if (unit != "kB") { + RAY_LOG(DEBUG) << "Unexpected VmHWM format: " << line + << ", not recording VmHWM for the task."; + return 0; + } + return hwm; + } + } + status_file.close(); + RAY_LOG(DEBUG) << "Failed to find VmHWM in " << kSelfStatusFile; + return 0; + } + + bool enabled_; + const TaskSpecification &spec_; + + constexpr static std::string_view kSelfStatusFile = "/proc/self/status"; + constexpr static std::string_view kVmHWM = "VmHWM:"; + // Write 5 to /proc/self/clear_refs to drop previous VmHWM value. This is needed + // because the worker may have served other tasks and gained a high VmHWM value. + // https://man7.org/linux/man-pages/man5/proc_pid_clear_refs.5.html + constexpr static std::string_view kSelfClearRefsFile = "/proc/self/clear_refs"; + constexpr static std::string_view kSelfClearRefsValue = "5"; +}; + +#else + +// No-op in MacOS or Windows. +class ScopedTaskRssHwmTracker { + public: + explicit ScopedTaskRssHwmTracker(const TaskSpecification &spec) {} +}; + +#endif + using ActorLifetime = ray::rpc::JobConfig_ActorLifetime; // Helper function converts GetObjectLocationsOwnerReply to ObjectLocation @@ -3019,6 +3111,8 @@ Status CoreWorker::ExecuteTask( task_type = TaskType::ACTOR_TASK; } + ScopedTaskRssHwmTracker rss_hwm_tracker(task_spec); + std::shared_ptr creation_task_exception_pb_bytes = nullptr; std::vector defined_concurrency_groups = {}; diff --git a/src/ray/stats/metric_defs.cc b/src/ray/stats/metric_defs.cc index 5d393acdce8d5..30c678e5f0ec5 100644 --- a/src/ray/stats/metric_defs.cc +++ b/src/ray/stats/metric_defs.cc @@ -52,6 +52,27 @@ DEFINE_stats( (), ray::stats::GAUGE); +/// Only recorded on Linux > 4.0 and with CONFIG_PROC_PAGE_MONITOR set. +/// Only recorded on normal tasks, not actors. +DEFINE_stats(task_rss_hwm_kb, + "Memory High Watermark of resident memory of finished tasks.", + ("Name", "JobId"), + // From 64MB to 128GB. An empty Python worker is typically ~90MB. + (1024.0 * 64, // 64 MB + 1024.0 * 128, // 128 MB + 1024.0 * 256, // 256 MB + 1024.0 * 512, // 512 MB + 1024.0 * 1024, // 1 GB + 1024.0 * 1024 * 2, // 2 GB + 1024.0 * 1024 * 4, // 4 GB + 1024.0 * 1024 * 8, // 8 GB + 1024.0 * 1024 * 16, // 16 GB + 1024.0 * 1024 * 32, // 32 GB + 1024.0 * 1024 * 64, // 64 GB + 1024.0 * 1024 * 128, // 128 GB + ), + ray::stats::HISTOGRAM); + /// Tracks actors by state, including pending, running, and idle actors. /// /// To avoid metric collection conflicts between components reporting on the same task, diff --git a/src/ray/stats/metric_defs.h b/src/ray/stats/metric_defs.h index d76c64e7f42f0..77ca2d4ce60e8 100644 --- a/src/ray/stats/metric_defs.h +++ b/src/ray/stats/metric_defs.h @@ -44,6 +44,8 @@ namespace stats { /// Tasks stats, broken down by state. DECLARE_stats(tasks); +/// Memory High Watermark of resident memory of tasks. +DECLARE_stats(task_rss_hwm_kb); /// Actor stats, broken down by state. DECLARE_stats(actors);