diff --git a/benchmarks/utils/evaluation.py b/benchmarks/utils/evaluation.py index 09c17a6fa..8815f817f 100644 --- a/benchmarks/utils/evaluation.py +++ b/benchmarks/utils/evaluation.py @@ -146,6 +146,24 @@ def _create_error_output( instance=instance.data, ) + def _create_error_output_with_metadata( + self, + instance: EvalInstance, + error: Exception, + attempt: int, + datapoint_id: UUID | None, + ) -> EvalOutput: + """Create an EvalOutput with proper metadata for timeout/deadlock errors.""" + error_output = self._create_error_output(instance, error, attempt) + if error_output.metadata is None: + error_output.metadata = self.metadata.model_copy(deep=True) + if self.metadata.lmnr is not None: + error_output.metadata.lmnr = LaminarEvalMetadata( + eval_id=self.metadata.lmnr.eval_id, + datapoint_id=datapoint_id, + ) + return error_output + def _capture_conversation_archive( self, workspace: RemoteWorkspace, @@ -317,6 +335,24 @@ def _run_iterative_mode( critic = self.metadata.critic all_outputs: List[EvalOutput] = [] + # Parse timeout config once, outside retry loop (env var won't change between attempts) + try: + no_progress_timeout = int( + os.getenv("EVALUATION_NO_PROGRESS_TIMEOUT", "1800") + ) + except ValueError: + logger.warning( + "Invalid EVALUATION_NO_PROGRESS_TIMEOUT value, using default 1800s" + ) + no_progress_timeout = 1800 + + if no_progress_timeout <= 0: + logger.warning( + f"Invalid EVALUATION_NO_PROGRESS_TIMEOUT={no_progress_timeout}, " + "using default 1800s" + ) + no_progress_timeout = 1800 + for attempt in range(1, self.metadata.max_attempts + 1): self.current_attempt = attempt logger.info(f"Starting attempt {attempt}/{self.metadata.max_attempts}") @@ -401,6 +437,10 @@ def attempt_on_result(instance: EvalInstance, out: EvalOutput) -> None: total=len(futures), desc=f"Attempt {attempt}", leave=False ) + # Track progress for deadlock detection + timed_out_count = 0 + last_progress_time = time.monotonic() + while pending: # Wait for any future to complete, with short timeout to check # for per-instance timeouts @@ -413,6 +453,7 @@ def attempt_on_result(instance: EvalInstance, out: EvalOutput) -> None: # Process completed futures for fut in done: progress.update(1) + last_progress_time = time.monotonic() try: instance, out = fut.result() pending_info = pending_instances.get(fut) @@ -447,40 +488,90 @@ def attempt_on_result(instance: EvalInstance, out: EvalOutput) -> None: for fut in timed_out_futures: pending.discard(fut) progress.update(1) + timed_out_count += 1 + last_progress_time = time.monotonic() pending_info = pending_instances.get(fut) - if pending_info: - inst = pending_info.instance + if pending_info is None: + # This indicates a bookkeeping bug - future is in pending + # but not tracked in pending_instances. Log and skip. logger.error( - f"Instance {inst.id} timed out after " - f"{self.instance_timeout}s" + "BUG: Timed out future missing from pending_instances. " + "Error output will not be created for this instance." ) - error_output = self._create_error_output( + fut.cancel() + continue + inst = pending_info.instance + logger.error( + f"Instance {inst.id} timed out after " + f"{self.instance_timeout}s" + ) + error_output = self._create_error_output_with_metadata( + inst, + TimeoutError( + f"Instance did not complete within " + f"{self.instance_timeout}s timeout" + ), + attempt, + pending_info.datapoint_id, + ) + attempt_on_result(inst, error_output) + # Note: fut.cancel() only prevents unstarted futures from + # starting. Running workers will continue until pool shutdown. + fut.cancel() + + # Deadlock detection: if no progress for too long, force terminate + time_since_progress = now - last_progress_time + if pending and time_since_progress > no_progress_timeout: + logger.error( + f"DEADLOCK DETECTED: No progress for " + f"{time_since_progress / 60:.1f} minutes with " + f"{len(pending)} pending instances. " + f"Force terminating stuck workers." + ) + for fut in list(pending): + # Increment per-instance for consistency with per-instance + # timeout handling above (line 491) + timed_out_count += 1 + pending_info = pending_instances.get(fut) + if pending_info is None: + # This indicates a bookkeeping bug - future is in pending + # but not tracked in pending_instances. Log and skip. + logger.error( + "BUG: Deadlocked future missing from pending_instances. " + "Error output will not be created for this instance." + ) + progress.update(1) + continue + inst = pending_info.instance + error_output = self._create_error_output_with_metadata( inst, - TimeoutError( - f"Instance did not complete within " - f"{self.instance_timeout}s timeout" + RuntimeError( + f"Worker deadlock detected after " + f"{time_since_progress / 60:.1f} minutes " + f"of no progress" ), attempt, + pending_info.datapoint_id, ) - if error_output.metadata is None: - error_output.metadata = self.metadata.model_copy( - deep=True - ) - # metadata is guaranteed non-None after the above assignment - if self.metadata.lmnr is not None: - error_output.metadata.lmnr = LaminarEvalMetadata( - eval_id=self.metadata.lmnr.eval_id, - datapoint_id=pending_info.datapoint_id, - ) attempt_on_result(inst, error_output) - # Note: fut.cancel() only prevents unstarted futures from - # starting. Running workers will continue until pool shutdown. - fut.cancel() + progress.update(1) + pending.clear() progress.close() - # Normal completion - shutdown gracefully - pool.shutdown(wait=True) + # Shutdown pool - force terminate if we had timeouts/deadlocks. + # Force termination is necessary because zombie workers (stuck in + # deadlock or infinite loop) won't respond to graceful shutdown + # signals. Without force termination, the entire evaluation would + # hang waiting for workers that will never complete. + if timed_out_count > 0: + logger.warning( + f"{timed_out_count} instances timed out or deadlocked. " + f"Force terminating zombie workers." + ) + self._cleanup_pool(pool, futures, wait=False) + else: + pool.shutdown(wait=True) except KeyboardInterrupt: logger.warning("KeyboardInterrupt received, shutting down workers...") self._cleanup_pool(pool, futures, wait=False) diff --git a/tests/test_deadlock_detection.py b/tests/test_deadlock_detection.py new file mode 100644 index 000000000..d58eb63c2 --- /dev/null +++ b/tests/test_deadlock_detection.py @@ -0,0 +1,463 @@ +"""Tests for deadlock detection patterns used in the evaluation module. + +This module contains tests for the deadlock detection algorithm: + +1. TestDeadlockDetection: Core deadlock detection mechanism tests. + Verifies the timeout/progress tracking pattern triggers correctly. + +2. TestConfigurableTimeout: Environment variable configuration tests. + Verifies EVALUATION_NO_PROGRESS_TIMEOUT is read correctly. + +3. TestDeadlockPatterns: Pattern validation tests. + Verifies the complete deadlock handling pattern (detect, create error + outputs, force terminate) works correctly using the same algorithm + as benchmarks/utils/evaluation.py. + +IMPORTANT - Testing Scope Acknowledgment: +These tests validate the ALGORITHM/PATTERN, not the actual Evaluator class +from benchmarks.utils.evaluation. If someone refactors the Evaluator's +deadlock detection, these tests would still pass unchanged. This is an +accepted trade-off: full E2E integration testing requires complex +infrastructure (real datasets, runtimes, worker pools) that isn't practical +for unit tests. These tests verify the algorithm is correct; manual testing +or production monitoring must verify the implementation uses that algorithm. +""" + +import os +import time +from concurrent.futures import FIRST_COMPLETED, ProcessPoolExecutor, wait + +import pytest + + +def hanging_worker(instance_id: str) -> tuple[str, dict]: + """Simulate a worker that hangs indefinitely (deadlock simulation).""" + while True: + time.sleep(1) + + +def slow_but_completing_worker(instance_id: str, sleep_time: float) -> tuple[str, dict]: + """Worker that completes after a delay.""" + time.sleep(sleep_time) + return instance_id, {"status": "completed"} + + +class TestDeadlockDetection: + """Tests for the deadlock detection mechanism.""" + + def test_deadlock_detection_triggers_on_no_progress(self): + """Test that deadlock detection triggers when no progress is made.""" + # Use a very short timeout for testing (5 seconds) + no_progress_timeout = 5 + + with ProcessPoolExecutor(max_workers=2) as pool: + futures = [] + future_to_instance = {} + future_start_times = {} + + # Submit workers that will hang + for i in range(2): + fut = pool.submit(hanging_worker, f"hanging_{i}") + futures.append(fut) + future_to_instance[fut] = f"hanging_{i}" + future_start_times[fut] = time.monotonic() + + pending = set(futures) + last_progress_time = time.monotonic() + deadlock_detected = False + timed_out_count = 0 + + while pending and not deadlock_detected: + done, pending = wait(pending, timeout=1.0, return_when=FIRST_COMPLETED) + + for fut in done: + last_progress_time = time.monotonic() + + # Deadlock detection logic (mirrors evaluation.py) + now = time.monotonic() + time_since_progress = now - last_progress_time + + if pending and time_since_progress > no_progress_timeout: + deadlock_detected = True + timed_out_count = len(pending) + # Clear pending to exit loop + for fut in list(pending): + fut.cancel() + pending.clear() + + # Force terminate pool + pool.shutdown(wait=False, cancel_futures=True) + + assert deadlock_detected, "Deadlock should be detected" + assert timed_out_count == 2, f"Expected 2 timed out, got {timed_out_count}" + + def test_no_deadlock_when_progress_is_made(self): + """Test that deadlock detection does NOT trigger when progress is made.""" + no_progress_timeout = 5 + + with ProcessPoolExecutor(max_workers=2) as pool: + futures = [] + future_to_instance = {} + + # Submit workers that complete at different times + for i in range(3): + fut = pool.submit( + slow_but_completing_worker, f"worker_{i}", 0.5 + i * 0.5 + ) + futures.append(fut) + future_to_instance[fut] = f"worker_{i}" + + pending = set(futures) + last_progress_time = time.monotonic() + completed = [] + deadlock_detected = False + + while pending: + done, pending = wait(pending, timeout=1.0, return_when=FIRST_COMPLETED) + + for fut in done: + last_progress_time = time.monotonic() + instance_id, result = fut.result() + completed.append(instance_id) + + now = time.monotonic() + time_since_progress = now - last_progress_time + + if pending and time_since_progress > no_progress_timeout: + deadlock_detected = True + pending.clear() + + assert not deadlock_detected, "Deadlock should NOT be detected" + assert len(completed) == 3, "All workers should complete" + + def test_timed_out_count_increments_correctly(self): + """Test that timed_out_count is correctly incremented for deadlocked instances.""" + no_progress_timeout = 3 + + with ProcessPoolExecutor(max_workers=3) as pool: + futures = [] + future_to_instance = {} + + # 1 fast worker + 2 hanging workers + fut_fast = pool.submit(slow_but_completing_worker, "fast", 0.2) + futures.append(fut_fast) + future_to_instance[fut_fast] = "fast" + + for i in range(2): + fut = pool.submit(hanging_worker, f"hanging_{i}") + futures.append(fut) + future_to_instance[fut] = f"hanging_{i}" + + pending = set(futures) + last_progress_time = time.monotonic() + timed_out_count = 0 + completed = [] + + while pending: + done, pending = wait(pending, timeout=1.0, return_when=FIRST_COMPLETED) + + for fut in done: + last_progress_time = time.monotonic() + try: + instance_id, result = fut.result() + completed.append(instance_id) + except Exception: + pass + + now = time.monotonic() + time_since_progress = now - last_progress_time + + if pending and time_since_progress > no_progress_timeout: + deadlocked_count = len(pending) + timed_out_count += deadlocked_count + for fut in list(pending): + fut.cancel() + pending.clear() + + pool.shutdown(wait=False, cancel_futures=True) + + assert "fast" in completed, "Fast worker should complete" + assert timed_out_count == 2, f"Expected 2 deadlocked, got {timed_out_count}" + + def test_error_output_created_for_deadlocked_instances(self): + """Test that error outputs are created with correct metadata for deadlocked instances.""" + no_progress_timeout = 3 + error_outputs = [] + + with ProcessPoolExecutor(max_workers=2) as pool: + futures = [] + future_to_instance = {} + + for i in range(2): + fut = pool.submit(hanging_worker, f"instance_{i}") + futures.append(fut) + future_to_instance[fut] = f"instance_{i}" + + pending = set(futures) + last_progress_time = time.monotonic() + + while pending: + done, pending = wait(pending, timeout=1.0, return_when=FIRST_COMPLETED) + + for fut in done: + last_progress_time = time.monotonic() + + now = time.monotonic() + time_since_progress = now - last_progress_time + + if pending and time_since_progress > no_progress_timeout: + for fut in list(pending): + instance_id = future_to_instance[fut] + # Simulate creating error output like evaluation.py does + error_outputs.append( + { + "instance_id": instance_id, + "error": f"Worker deadlock detected after {time_since_progress / 60:.1f} minutes", + "time_since_progress": time_since_progress, + } + ) + fut.cancel() + pending.clear() + + pool.shutdown(wait=False, cancel_futures=True) + + assert len(error_outputs) == 2, "Should have 2 error outputs" + for output in error_outputs: + assert "deadlock" in output["error"].lower() + assert output["time_since_progress"] >= no_progress_timeout + + def test_pending_set_cleared_after_deadlock(self): + """Test that the pending set is properly cleared after deadlock detection.""" + no_progress_timeout = 3 + + with ProcessPoolExecutor(max_workers=2) as pool: + futures = [] + + for i in range(2): + fut = pool.submit(hanging_worker, f"instance_{i}") + futures.append(fut) + + pending = set(futures) + last_progress_time = time.monotonic() + pending_after_detection = None + + while pending: + done, pending = wait(pending, timeout=1.0, return_when=FIRST_COMPLETED) + + for fut in done: + last_progress_time = time.monotonic() + + now = time.monotonic() + time_since_progress = now - last_progress_time + + if pending and time_since_progress > no_progress_timeout: + for fut in list(pending): + fut.cancel() + pending.clear() + pending_after_detection = len(pending) + + pool.shutdown(wait=False, cancel_futures=True) + + assert pending_after_detection == 0, ( + "Pending set should be empty after deadlock" + ) + + +class TestConfigurableTimeout: + """Tests for the configurable no-progress timeout.""" + + def test_timeout_from_env_var(self): + """Test that EVALUATION_NO_PROGRESS_TIMEOUT env var is respected.""" + # Set a custom timeout + os.environ["EVALUATION_NO_PROGRESS_TIMEOUT"] = "10" + timeout_value = int(os.getenv("EVALUATION_NO_PROGRESS_TIMEOUT", "1800")) + assert timeout_value == 10 + + # Cleanup + del os.environ["EVALUATION_NO_PROGRESS_TIMEOUT"] + + def test_default_timeout(self): + """Test default timeout is 1800 seconds (30 minutes).""" + # Ensure env var is not set + if "EVALUATION_NO_PROGRESS_TIMEOUT" in os.environ: + del os.environ["EVALUATION_NO_PROGRESS_TIMEOUT"] + + timeout_value = int(os.getenv("EVALUATION_NO_PROGRESS_TIMEOUT", "1800")) + assert timeout_value == 1800 + + def test_invalid_env_var_handling(self): + """Test that invalid env var values are handled gracefully.""" + # This tests the pattern used in evaluation.py + os.environ["EVALUATION_NO_PROGRESS_TIMEOUT"] = "not_a_number" + + try: + timeout_value = int(os.getenv("EVALUATION_NO_PROGRESS_TIMEOUT", "1800")) + # Should not reach here + assert False, "Should have raised ValueError" + except ValueError: + # Expected - code should fall back to default + timeout_value = 1800 + + assert timeout_value == 1800 + + # Cleanup + del os.environ["EVALUATION_NO_PROGRESS_TIMEOUT"] + + def test_zero_timeout_falls_back_to_default(self): + """Test that zero timeout values fall back to default 1800s. + + Zero or negative timeouts would cause deadlock detection to fire + immediately, which is not useful. The implementation should fall + back to the default value. + """ + # Test the pattern: parse then validate + os.environ["EVALUATION_NO_PROGRESS_TIMEOUT"] = "0" + timeout_value = int(os.getenv("EVALUATION_NO_PROGRESS_TIMEOUT", "1800")) + + # Zero is invalid - should fall back to default + if timeout_value <= 0: + timeout_value = 1800 + + assert timeout_value == 1800 + del os.environ["EVALUATION_NO_PROGRESS_TIMEOUT"] + + def test_negative_timeout_falls_back_to_default(self): + """Test that negative timeout values fall back to default 1800s.""" + os.environ["EVALUATION_NO_PROGRESS_TIMEOUT"] = "-100" + timeout_value = int(os.getenv("EVALUATION_NO_PROGRESS_TIMEOUT", "1800")) + + # Negative is invalid - should fall back to default + if timeout_value <= 0: + timeout_value = 1800 + + assert timeout_value == 1800 + del os.environ["EVALUATION_NO_PROGRESS_TIMEOUT"] + + +class TestDeadlockPatterns: + """Tests that verify the deadlock detection pattern works correctly. + + These tests validate the deadlock detection algorithm (track last_progress_time, + check for no-progress timeout, create error outputs) using the same pattern as + benchmarks/utils/evaluation.py. They ensure the pattern itself is correct. + + Note: These are NOT integration tests of the Evaluator class. Full integration + testing of Evaluator requires complex setup (datasets, models, runtimes) that + is better suited for end-to-end tests. + """ + + def test_deadlock_pattern_creates_error_outputs(self, monkeypatch, tmp_path): + """Test that the deadlock detection pattern creates proper error outputs. + + Validates that when workers hang and no progress is made, the pattern: + 1. Detects the deadlock condition + 2. Creates appropriate error outputs for each deadlocked worker + 3. Properly cancels hanging futures + """ + # Use very short timeout for testing (3 seconds) + no_progress_timeout = 3 + error_outputs = [] + + with ProcessPoolExecutor(max_workers=2) as pool: + futures = [] + future_to_instance = {} + + # Create test instances and submit hanging workers + for i in range(2): + instance_id = f"test_{i}" + fut = pool.submit(hanging_worker, instance_id) + futures.append(fut) + future_to_instance[fut] = instance_id + + pending = set(futures) + last_progress_time = time.monotonic() + timed_out_count = 0 + + while pending: + done, pending = wait(pending, timeout=1.0, return_when=FIRST_COMPLETED) + + for fut in done: + last_progress_time = time.monotonic() + + now = time.monotonic() + time_since_progress = now - last_progress_time + + if pending and time_since_progress > no_progress_timeout: + # Deadlock detected - create error outputs like evaluation.py does + for fut in list(pending): + timed_out_count += 1 + instance_id = future_to_instance.get(fut) + if instance_id: + error_outputs.append( + { + "instance_id": instance_id, + "error": f"Worker deadlock detected after {time_since_progress / 60:.1f} minutes", + "test_result": None, # Failed due to deadlock + } + ) + fut.cancel() + pending.clear() + + pool.shutdown(wait=False, cancel_futures=True) + + # Verify error outputs were created for all deadlocked instances + assert len(error_outputs) == 2, ( + f"Expected 2 error outputs, got {len(error_outputs)}" + ) + for output in error_outputs: + assert "deadlock" in output["error"].lower(), ( + "Error should mention deadlock" + ) + assert output["test_result"] is None, ( + "Failed instances should have no test result" + ) + assert timed_out_count == 2, ( + f"Expected timed_out_count=2, got {timed_out_count}" + ) + + def test_deadlock_pattern_force_terminates(self, monkeypatch, tmp_path): + """Test that the deadlock pattern force terminates workers. + + Verifies that when deadlock is detected, the pattern properly: + 1. Cancels pending futures + 2. Clears the pending set + 3. Triggers pool shutdown without hanging + + This ensures the pattern won't cause infinite hangs. + """ + + # Use short timeout + monkeypatch.setenv("EVALUATION_NO_PROGRESS_TIMEOUT", "3") + + # Record start time + start_time = time.monotonic() + + with ProcessPoolExecutor(max_workers=2) as pool: + futures = [pool.submit(hanging_worker, f"test_{i}") for i in range(2)] + pending = set(futures) + last_progress_time = time.monotonic() + no_progress_timeout = 3 + + while pending: + done, pending = wait(pending, timeout=1.0, return_when=FIRST_COMPLETED) + + for fut in done: + last_progress_time = time.monotonic() + + now = time.monotonic() + if pending and now - last_progress_time > no_progress_timeout: + # Force terminate by canceling and clearing + for fut in list(pending): + fut.cancel() + pending.clear() + + # Force shutdown + pool.shutdown(wait=False, cancel_futures=True) + + elapsed = time.monotonic() - start_time + # Should complete within timeout + some buffer, not hang forever + assert elapsed < 15, f"Test took too long ({elapsed}s), possible hang" + + +if __name__ == "__main__": + pytest.main([__file__, "-v", "-s"])