From 7cb7486e55918ceb737cd0b8c3903561309d32c5 Mon Sep 17 00:00:00 2001 From: Quantum Date: Wed, 21 Aug 2024 01:22:30 -0400 Subject: [PATCH] Use lowest tier judge only and hold the rest in reserve --- judge/bridge/judge_handler.py | 4 +++ judge/bridge/judge_list.py | 53 ++++++++++++++++++++++++++++++----- 2 files changed, 50 insertions(+), 7 deletions(-) diff --git a/judge/bridge/judge_handler.py b/judge/bridge/judge_handler.py index 073de27262..3d72a81ab2 100644 --- a/judge/bridge/judge_handler.py +++ b/judge/bridge/judge_handler.py @@ -59,6 +59,7 @@ def __init__(self, request, client_address, server, judges): self.load = 1e100 self.name = None self.is_disabled = False + self.tier = None self.batch_id = None self.in_batch = False self._stop_ping = threading.Event() @@ -107,6 +108,9 @@ def _authenticate(self, id, key): json_log.warning(self._make_json_log(action='auth', judge=id, info='judge authenticated but is blocked')) return False + # Cache judge tier for use by JudgeList + self.tier = judge.tier + return True def _connected(self): diff --git a/judge/bridge/judge_list.py b/judge/bridge/judge_list.py index b65d0ddb82..0499b2d7e8 100644 --- a/judge/bridge/judge_list.py +++ b/judge/bridge/judge_list.py @@ -25,16 +25,19 @@ def __init__(self): self.node_map = {} self.submission_map = {} self.lock = RLock() + self.min_tier = None def _handle_free_judge(self, judge): with self.lock: + if judge.tier > self.min_tier: + return + node = self.queue.first priority = 0 while node: if isinstance(node.value, PriorityMarker): priority = node.value.priority + 1 - elif priority >= REJUDGE_PRIORITY and self.count_not_disabled() > 1 and sum( - not judge.working and not judge.is_disabled for judge in self.judges) <= 1: + elif priority >= REJUDGE_PRIORITY and self.should_reserve_judge(): return else: id, problem, language, source, judge_id = node.value @@ -52,14 +55,47 @@ def _handle_free_judge(self, judge): break node = node.next - def count_not_disabled(self): - return sum(not judge.is_disabled for judge in self.judges) + def _update_min_tier(self): + with self.lock: + old = self.min_tier + try: + self.min_tier = min(judge.tier for judge in self.judges + if judge.tier is not None and not judge.is_disabled) + except ValueError: + self.min_tier = None + + if old != self.min_tier: + logger.info('Minimum tier changed from %s to %s', old, self.min_tier) + + # We must be adding a judge, let register handle the new judge. + if old is None: + return + + # If the new min tier is larger, then we should treat all judges of the new tier as free and start grading. + # This is only possible when removing a judge. + if self.min_tier is not None and self.min_tier > old: + for judge in self.current_tier_judges(): + logger.info('Minimum tier increased, trying to dispatch to judge: %s', judge.name) + if not judge.working: + self._handle_free_judge(judge) + + def current_tier_judges(self): + return [judge for judge in self.judges if judge.tier == self.min_tier and not judge.is_disabled] + + def should_reserve_judge(self): + judges = self.current_tier_judges() + if len(judges) <= 1: + return False + + free_judges = sum(not judge.working for judge in judges) + return free_judges <= 1 def register(self, judge): with self.lock: # Disconnect all judges with the same name, see self.disconnect(judge, force=True) self.judges.add(judge) + self._update_min_tier() self._handle_free_judge(judge) def disconnect(self, judge_id, force=False): @@ -77,6 +113,7 @@ def update_disable_judge(self, judge_id, is_disabled): for judge in self.judges: if judge.name == judge_id: judge.is_disabled = is_disabled + self._update_min_tier() def remove(self, judge): with self.lock: @@ -87,11 +124,13 @@ def remove(self, judge): except KeyError: pass self.judges.discard(judge) + self._update_min_tier() # Since we reserve a judge for high priority submissions when there are more than one, # we'll need to start judging if there is exactly one judge and it's free. - if len(self.judges) == 1: - judge = next(iter(self.judges)) + current_tier = self.current_tier_judges() + if len(current_tier) == 1: + judge = next(iter(current_tier)) if not judge.working: self._handle_free_judge(judge) @@ -131,7 +170,7 @@ def judge(self, id, problem, language, source, judge_id, priority): # idempotent. return - candidates = [judge for judge in self.judges if judge.can_judge(problem, language, judge_id)] + candidates = [judge for judge in self.current_tier_judges() if judge.can_judge(problem, language, judge_id)] available = [judge for judge in candidates if not judge.working and not judge.is_disabled] if judge_id: logger.info('Specified judge %s is%savailable', judge_id, ' ' if available else ' not ')