diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..944b817 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +.idea/ +codexw-generic-bootstrap.yaml diff --git a/.pre-commit-hooks.yaml b/.pre-commit-hooks.yaml index 999e6ba..90c0bde 100644 --- a/.pre-commit-hooks.yaml +++ b/.pre-commit-hooks.yaml @@ -17,11 +17,36 @@ files: &sync_ai_rules_files (^\.cursor/rules/.*\.mdc$|^\.code_review/.*\.md$) pass_filenames: false -# Nobody should ever use these hooks in production. They're just for testing PRs in -# the duolingo/pre-commit-hooks repo more easily without having to tag and push -# temporary images to Docker Hub. Usage: edit a consumer repo's hook config to -# instead declare `id: duolingo-dev` or `id: sync-ai-rules-dev` and `rev: `, -# then run `pre-commit run --all-files` +- id: codex-review + name: Codex AI Code Review + description: On-demand AI code review using OpenAI Codex CLI. Requires codex CLI installed and authenticated. + entry: codex review + language: system + pass_filenames: false + stages: [manual] + verbose: true + +- id: codex-review-pr-grade + name: Codex AI Code Review (PR-grade) + description: Profile-aware multi-pass Codex review via ./codexw/__main__.py (auto-generates and auto-syncs local-review-profile.yaml). + entry: ./codexw/__main__.py review + language: script + pass_filenames: false + stages: [manual] + verbose: true + +- id: codexw + name: Codexw (alias) + description: Alias for codex-review-pr-grade. + entry: ./codexw/__main__.py review + language: script + pass_filenames: false + stages: [manual] + verbose: true + +# Dev hooks for testing PRs in the duolingo/pre-commit-hooks repo. +# Usage: edit a consumer repo's hook config to declare `id: duolingo-dev` +# and `rev: `, then run `pre-commit run --all-files` - id: duolingo-dev name: Duolingo (dev) entry: /entry @@ -34,3 +59,27 @@ language: docker files: *sync_ai_rules_files pass_filenames: false + +- id: codex-review-dev + name: Codex AI Code Review (dev) + entry: codex review + language: system + pass_filenames: false + stages: [manual] + verbose: true + +- id: codex-review-pr-grade-dev + name: Codex AI Code Review (PR-grade, dev) + entry: ./codexw/__main__.py review + language: script + pass_filenames: false + stages: [manual] + verbose: true + +- id: codexw-dev + name: Codexw (alias, dev) + entry: ./codexw/__main__.py review + language: script + pass_filenames: false + stages: [manual] + verbose: true diff --git a/Makefile b/Makefile index 103d265..ea72ed3 100644 --- a/Makefile +++ b/Makefile @@ -48,6 +48,8 @@ shell: # Runs tests .PHONY: test test: + echo "Running codexw tests..." + python3 test/codexw_test.py docker run --rm -v "$${PWD}/test:/test" "$$(docker build --network=host -q .)" sh -c \ 'cd /tmp \ && cp -r /test/before actual \ diff --git a/README.md b/README.md index 1615d81..e05d1b5 100644 --- a/README.md +++ b/README.md @@ -49,6 +49,240 @@ This hook synchronizes AI coding rules from `.cursor/rules/` and `.code_review/` This ensures all AI coding assistants stay aware of the same rules and coding conventions. +Execution model note: + +- `sync-ai-rules` is deterministic file generation, so it uses `language: docker_image` and a `files:` filter to run automatically when rule files change. +- Codex hooks are AI-review workflows (`codex review` / `codexw`) that may take longer and require local auth, so they run in `manual` stage by default and are triggered on demand. + +## Codex AI Code Review Hook (`codex-review`) + +On-demand AI code review using the OpenAI Codex CLI. This hook runs in `manual` stage by default, meaning it won't block normal commits. + +**Prerequisites:** + +- Install Codex CLI: `brew install codex` or `npm install -g @openai/codex` +- Authenticate: `codex auth login` (uses Duolingo ChatGPT org credentials) + +**Usage:** + +```bash +# Run Codex review on staged changes +pre-commit run codex-review + +# Run on all files +pre-commit run codex-review --all-files +``` + +For direct CLI usage without pre-commit: + +```bash +codex review --uncommitted +codex review --base master +``` + +## Codex PR-grade Hook (`codex-review-pr-grade`, alias: `codexw`) + +Profile-aware multi-pass local review using `codexw`. This hook is also `manual` by default and does not block normal commits. + +It runs detailed PR-grade review from `local-review-profile.yaml`. +`codexw` also includes compatibility fallback for Codex CLI versions that reject prompt+target combinations. +`codexw` also includes recency-biased model fallback (latest 5 candidates, for example `gpt-5.3-codex` → `gpt-5.2-codex` → `gpt-5.1-codex` → `gpt-5-codex` → `gpt-4.2-codex`) and reasoning-effort fallback (`xhigh` → `high` → `medium` → `low`). +Canonical command is `./codexw/__main__.py review`; `./codexw/__main__.py review-pr` is kept as a compatibility alias. +If profile is missing, `codexw` auto-generates `local-review-profile.yaml` on first run. +On each run, `codexw` auto-syncs profile entries derived from repository signals (rules/domains/domain prompts) while preserving manual overrides. Stale auto-managed entries are pruned when source-of-truth changes. + +PR-grade outputs include: + +- pass-level markdown reports +- combined markdown report (`combined-report.md`) +- machine-readable findings (`findings.json`) + +**Prerequisites:** + +- Install Codex CLI: `brew install codex` or `npm install -g @openai/codex` +- Authenticate: `codex auth login` +- Optional: pre-seed `local-review-profile.yaml` in target repo root (see example below) + +**Usage:** + +```bash +# Run PR-grade review for current diff vs profile default base branch +pre-commit run codex-review-pr-grade +pre-commit run codexw + +# Run PR-grade review for all files (still uses profile + pass orchestration) +pre-commit run codex-review-pr-grade --all-files +pre-commit run codexw --all-files +``` + +Direct execution (without pre-commit): + +```bash +./codexw/__main__.py review +./codexw/__main__.py review --base main +./codexw/__main__.py review --domains core,testing --no-fail-on-findings +# Create missing profile and exit +./codexw/__main__.py review --bootstrap-only +# Sync profile from repository signals and exit +./codexw/__main__.py review --sync-profile-only +# Validate profile loading only (no Codex run) +./codexw/__main__.py review --print-effective-profile +# Disable profile sync for one run +./codexw/__main__.py review --no-sync-profile +# Keep stale auto-managed profile entries for this run +./codexw/__main__.py review --no-prune-autogen +``` + +`review-pr` is an alias for `review` (kept for backward compatibility): + +```bash +./codexw/__main__.py review-pr --base master +``` + +### codexw CLI Flags Reference + +| Flag | Purpose | Default / Notes | +| --- | --- | --- | +| `--profile ` | Profile file to load/write. | Defaults to `local-review-profile.yaml` at repo root. | +| `--base ` | Review `branch...HEAD` diff. | Mutually exclusive with `--uncommitted` and `--commit`. Defaults to profile `review.default_base` when no target flag is passed. | +| `--uncommitted` | Review working tree changes. | Mutually exclusive target mode. Includes tracked and untracked files. | +| `--commit ` | Review a specific commit. | Mutually exclusive target mode. | +| `--domains ` | Restrict domain passes to selected domains. | Must be subset of profile `domains.allowed`. Defaults to profile `domains.default`. | +| `--depth-hotspots ` | Override hotspot depth pass count for this run. | Overrides profile `review.depth_hotspots`. | +| `--title ` | Pass custom title to `codex review`. | Optional metadata for review runs. | +| `--output-dir ` | Write artifacts to explicit output directory. | Defaults to `/`. | +| `--model ` | Requested model override for this run. | Used as preferred model; fallback chain may apply if unavailable. | +| `--print-effective-profile` | Print normalized effective profile and exit. | No review passes executed. | +| `--bootstrap-only` | Create missing profile (if needed) and exit. | No review passes executed. | +| `--sync-profile-only` | Sync profile from repo signals and exit. | No review passes executed. Cannot be combined with `--no-sync-profile`. | +| `--no-bootstrap-profile` | Disable automatic profile generation when missing. | Fails if profile file is absent. | +| `--no-sync-profile` | Disable sync from repository signals for this run. | Uses profile file as-is. | +| `--no-prune-autogen` | Keep stale auto-managed entries during sync for this run. | Sync still runs unless `--no-sync-profile` is set. | +| `--fail-on-findings` | Force strict gate (exit 2 when findings exist). | Mutually exclusive with `--no-fail-on-findings`. | +| `--no-fail-on-findings` | Advisory mode (do not fail on findings). | Mutually exclusive with `--fail-on-findings`. | + +### Common Flag Combos + +```bash +# Validate profile and inspect resolved settings (no Codex call) +./codexw/__main__.py review --profile local-review-profile.yaml --print-effective-profile + +# Advisory targeted review for local iteration +./codexw/__main__.py review --uncommitted --domains core,testing --no-fail-on-findings + +# Strict PR-grade run with explicit artifacts path +./codexw/__main__.py review --base master --fail-on-findings --output-dir .codex/review-runs/manual +``` + +### Sample Output for codexw-only Flags + +`--bootstrap-only` (missing profile): + +```text +$ ./codexw/__main__.py review --bootstrap-only +Generated local-review-profile.yaml from repository signals. Review and commit it. +Synchronized local-review-profile.yaml from repository signals. +warning: rule file 'AGENTS.md' not found +warning: rule pattern '.cursor/rules/**/*.mdc' matched no files +Profile ready: /local-review-profile.yaml +``` + +`--sync-profile-only`: + +```text +$ ./codexw/__main__.py review --sync-profile-only +warning: rule file 'AGENTS.md' not found +warning: rule pattern '.cursor/rules/**/*.mdc' matched no files +Profile ready: /local-review-profile.yaml +``` + +`--print-effective-profile`: + +```text +$ ./codexw/__main__.py review --print-effective-profile +warning: rule file 'AGENTS.md' not found +warning: rule pattern '.cursor/rules/**/*.mdc' matched no files +{ + "profile_path": "/local-review-profile.yaml", + "repo_root": "", + "effective_profile": { ...normalized profile JSON... } +} +``` + +`--no-bootstrap-profile` (profile missing): + +```text +$ ./codexw/__main__.py review --no-bootstrap-profile --print-effective-profile +error: profile not found: /local-review-profile.yaml. Add local-review-profile.yaml or pass --profile. +``` + +`--sync-profile-only` with `--no-sync-profile` (invalid combination): + +```text +$ ./codexw/__main__.py review --sync-profile-only --no-sync-profile +error: --sync-profile-only cannot be combined with --no-sync-profile +``` + +`local-review-profile.yaml` schema (minimum practical shape): + +```yaml +version: 1 + +repo: + name: Repo Name + +review: + default_base: main + strict_gate: true + depth_hotspots: 3 + output_root: .codex/review-runs + +rules: + include: + - AGENTS.md + - .cursor/rules/**/*.mdc + +domains: + default: [core] + allowed: [core, testing] + +prompts: + global: | + Additional repo-wide review context. + by_domain: + testing: | + Additional testing-specific context. + +pipeline: + include_policy_pass: true + include_core_passes: true + include_domain_passes: true + include_depth_passes: true + policy_instructions: | + Custom policy pass instructions. + core_passes: + - id: core-breadth + name: Core breadth + instructions: | + Custom breadth pass instructions. + depth_instructions: | + Task: + - Perform depth-first review of hotspot file: {hotspot} +``` + +Reference profiles: + +- `codexw/local-review-profile.repo.example.yaml` (generic template) +- `codexw/local-review-profile.duolingo-android.example.yaml` (concrete Duolingo Android example) + +Feature/use-case guide: + +- `codexw/features-and-usecases.md` +- `codexw/architecture.md` (internal architecture) + +Hook id for pre-commit: +`codex-review-pr-grade` (canonical), `codexw` (alias) + ## Usage Repo maintainers can declare these hooks in `.pre-commit-config.yaml`: @@ -68,6 +302,12 @@ Repo maintainers can declare these hooks in `.pre-commit-config.yaml`: - --scala-version=3 # Defaults to Scala 2.12 # Sync AI rules hook (for repos with Cursor AI rules) - id: sync-ai-rules + # On-demand Codex AI code review (manual stage, requires codex CLI) + - id: codex-review + # On-demand PR-grade Codex review (manual stage, profile-aware) + - id: codex-review-pr-grade + # Equivalent alias for PR-grade Codex review (manual stage, profile-aware) + - id: codexw ``` Directories named `build` and `node_modules` are excluded by default - no need to declare them in the hook's `exclude` key. diff --git a/codexw/__init__.py b/codexw/__init__.py new file mode 100644 index 0000000..4e61054 --- /dev/null +++ b/codexw/__init__.py @@ -0,0 +1,31 @@ +"""Codexw: Profile-aware Codex PR-grade review wrapper. + +Public API: + - CodexwError: Base exception type + - PassSpec: Data class for review pass specification + - build_bootstrap_profile: Build initial profile from repo signals + - default_domain_prompt_template: Generic per-domain prompt template + - normalize_profile: Normalize raw profile dict + - load_profile: Load profile from file + - write_profile: Write profile to file +""" + +from .passes import PassSpec +from .profile import ( + build_bootstrap_profile, + default_domain_prompt_template, + load_profile, + normalize_profile, + write_profile, +) +from .utils import CodexwError + +__all__ = [ + "CodexwError", + "PassSpec", + "build_bootstrap_profile", + "default_domain_prompt_template", + "load_profile", + "normalize_profile", + "write_profile", +] diff --git a/codexw/__main__.py b/codexw/__main__.py new file mode 100755 index 0000000..e9c227c --- /dev/null +++ b/codexw/__main__.py @@ -0,0 +1,346 @@ +#!/usr/bin/env python3 +"""Codexw: Profile-aware Codex PR-grade review wrapper. + +This is the main entry point for codexw. It orchestrates: +1. Profile loading and synchronization +2. Git change detection +3. Pass execution via Codex CLI +4. Report generation + +Usage: + python -m codexw review --base master + python -m codexw review --uncommitted + ./codexw/__main__.py review +""" + +from __future__ import annotations + +import datetime as dt +import json +import os +import sys +from pathlib import Path + +if __package__ in {None, ""}: + # Support direct script execution: + # ./codexw/__main__.py review + repo_root_for_imports = Path(__file__).resolve().parents[1] + repo_root_str = str(repo_root_for_imports) + if repo_root_str not in sys.path: + sys.path.insert(0, repo_root_str) + + from codexw.cli import build_parser + from codexw.git import ( + changed_modules, + collect_changed_files, + collect_numstat, + find_repo_root, + resolve_base_ref, + ) + from codexw.passes import PassBuilder, PassRunner + from codexw.profile import ( + build_bootstrap_profile, + discover_rule_files, + load_profile, + normalize_profile, + sync_profile_with_repo, + validate_rule_patterns, + write_profile, + ) + from codexw.reporting import ( + write_combined_report, + write_empty_report, + write_findings_json, + write_support_files, + ) + from codexw.utils import CodexwError, shutil_which +else: + from .cli import build_parser + from .git import ( + changed_modules, + collect_changed_files, + collect_numstat, + find_repo_root, + resolve_base_ref, + ) + from .passes import PassBuilder, PassRunner + from .profile import ( + build_bootstrap_profile, + discover_rule_files, + load_profile, + normalize_profile, + sync_profile_with_repo, + validate_rule_patterns, + write_profile, + ) + from .reporting import ( + write_combined_report, + write_empty_report, + write_findings_json, + write_support_files, + ) + from .utils import CodexwError, shutil_which + + +def run_review(args) -> int: + """Execute the review workflow.""" + repo_root = find_repo_root(Path.cwd()) + os.chdir(repo_root) + + # Resolve profile path + profile_path = Path(args.profile or "local-review-profile.yaml") + if not profile_path.is_absolute(): + profile_path = repo_root / profile_path + + # Bootstrap profile if missing + if not profile_path.exists(): + if args.no_bootstrap_profile: + raise CodexwError( + f"profile not found: {profile_path}. " + "Add local-review-profile.yaml or pass --profile." + ) + bootstrap_profile = build_bootstrap_profile(repo_root) + write_profile(profile_path, bootstrap_profile) + try: + profile_display = str(profile_path.relative_to(repo_root)) + except ValueError: + profile_display = str(profile_path) + print( + f"Generated {profile_display} from repository signals. Review and commit it.", + file=sys.stderr, + ) + + if not profile_path.exists(): + raise CodexwError(f"profile not found: {profile_path}") + + # Validate sync options + if args.sync_profile_only and args.no_sync_profile: + raise CodexwError("--sync-profile-only cannot be combined with --no-sync-profile") + + # Load and sync profile + raw_profile = load_profile(profile_path) + if args.no_sync_profile: + synced_profile = raw_profile + else: + synced_profile, was_updated = sync_profile_with_repo( + raw_profile, + repo_root, + prune_autogen=not args.no_prune_autogen, + ) + if was_updated: + write_profile(profile_path, synced_profile) + try: + profile_display = str(profile_path.relative_to(repo_root)) + except ValueError: + profile_display = str(profile_path) + print( + f"Synchronized {profile_display} from repository signals.", + file=sys.stderr, + ) + + profile = normalize_profile(synced_profile) + + # Validate rule patterns + resolved_patterns, warnings = validate_rule_patterns(repo_root, profile["rule_patterns"]) + for warning in warnings: + print(f"warning: {warning}", file=sys.stderr) + profile["rule_patterns"] = resolved_patterns + + # Handle print-effective-profile + if args.print_effective_profile: + print( + json.dumps( + { + "profile_path": str(profile_path), + "repo_root": str(repo_root), + "effective_profile": profile, + }, + indent=2, + sort_keys=True, + ) + ) + return 0 + + # Handle bootstrap/sync-only modes + if args.bootstrap_only or args.sync_profile_only: + print(f"Profile ready: {profile_path}") + return 0 + + # Verify codex CLI is available + if not shutil_which("codex"): + raise CodexwError("codex CLI not found in PATH") + + # Determine review mode + mode = "base" + base_branch = args.base or profile["default_base"] + commit_sha = args.commit or "" + if args.uncommitted: + mode = "uncommitted" + elif args.commit: + mode = "commit" + elif mode == "base": + base_branch = resolve_base_ref(repo_root, base_branch) + + # Determine gating mode + fail_on_findings = profile["strict_gate"] + if args.fail_on_findings: + fail_on_findings = True + if args.no_fail_on_findings: + fail_on_findings = False + + # Determine depth hotspots + depth_hotspots = ( + args.depth_hotspots if args.depth_hotspots is not None else profile["depth_hotspots"] + ) + + # Validate domains + allowed_domains = profile["allowed_domains"] + default_domains = profile["default_domains"] + if args.domains: + selected_domains = [d.strip() for d in args.domains.split(",") if d.strip()] + else: + selected_domains = list(default_domains) + + unknown = [d for d in selected_domains if d not in allowed_domains] + if unknown: + raise CodexwError( + f"invalid domain(s): {', '.join(unknown)}. Allowed: {', '.join(allowed_domains)}" + ) + + # Setup output directory + ts = dt.datetime.now().strftime("%Y%m%d-%H%M%S") + output_root = Path(args.output_dir) if args.output_dir else Path(profile["output_root"]) / ts + if not output_root.is_absolute(): + output_root = repo_root / output_root + output_root.mkdir(parents=True, exist_ok=True) + + # Build target arguments for codex CLI + target_args: list[str] = [] + if mode == "base": + target_args += ["--base", base_branch] + target_desc = f"base branch: {base_branch}" + elif mode == "uncommitted": + target_args += ["--uncommitted"] + target_desc = "uncommitted changes" + else: + target_args += ["--commit", commit_sha] + target_desc = f"commit: {commit_sha}" + + if args.title: + target_args += ["--title", args.title] + + model_override = args.model or "" + + # Discover rule files + rule_files = discover_rule_files(repo_root, profile["rule_patterns"]) + + # Collect changed files + changed_files = collect_changed_files(repo_root, mode, base_branch, commit_sha) + modules = changed_modules(changed_files) + + # Collect hotspots + numstat = collect_numstat(repo_root, mode, base_branch, commit_sha) + hotspots = [path for _, path in numstat[:depth_hotspots] if depth_hotspots > 0] + + # Handle empty diff + if not changed_files: + combined_report = output_root / "combined-report.md" + write_empty_report(combined_report, profile, target_desc, selected_domains) + print("No files detected for selected target.") + print(f"Combined report: {combined_report}") + return 0 + + # Build passes + pass_builder = PassBuilder( + profile=profile, + rule_files=rule_files, + changed_files=changed_files, + modules=modules, + hotspots=hotspots, + selected_domains=selected_domains, + ) + passes = pass_builder.build_passes() + + if not passes: + raise CodexwError("no review passes configured; check profile.pipeline settings") + + # Run passes + pass_runner = PassRunner( + repo_root=repo_root, + output_root=output_root, + target_args=target_args, + target_desc=target_desc, + model_override=model_override or None, + ) + summary_lines, raw_findings, executed_pass_files = pass_runner.run_all(passes) + + # Write support files + write_support_files( + output_root=output_root, + rule_files=rule_files, + changed_files=changed_files, + modules=modules, + hotspots=hotspots, + summary_lines=summary_lines, + ) + + # Write findings JSON + findings_json = output_root / "findings.json" + write_findings_json(findings_json, target_desc, raw_findings) + + # Write combined report + combined_report = output_root / "combined-report.md" + write_combined_report( + path=combined_report, + profile=profile, + profile_path=profile_path, + repo_root=repo_root, + target_desc=target_desc, + selected_domains=selected_domains, + rule_files=rule_files, + changed_files=changed_files, + modules=modules, + hotspots=hotspots, + depth_hotspots=depth_hotspots, + pass_count=len(passes), + summary_lines=summary_lines, + raw_findings=raw_findings, + findings_json_path=findings_json, + executed_pass_files=executed_pass_files, + title=args.title, + model_override=model_override, + ) + + print("\nDone.") + print(f"Per-pass outputs: {output_root}") + print(f"Combined report: {combined_report}") + + if raw_findings: + print("Status: active findings detected.") + if fail_on_findings: + print("Exiting non-zero because fail-on-findings is enabled.", file=sys.stderr) + return 2 + else: + print("Status: no active findings in executed passes.") + + return 0 + + +def main() -> int: + """Main entry point.""" + parser = build_parser() + args = parser.parse_args() + + try: + if args.command in {"review", "review-pr"}: + return run_review(args) + + parser.print_help() + return 1 + except CodexwError as exc: + print(f"error: {exc}", file=sys.stderr) + return exc.code + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/codexw/architecture.md b/codexw/architecture.md new file mode 100644 index 0000000..0ec5ecb --- /dev/null +++ b/codexw/architecture.md @@ -0,0 +1,258 @@ +# Codexw Architecture + +This document describes the internal architecture of the `codexw` package — a profile-aware, multi-pass Codex CLI wrapper for local PR-grade code review. + +## Module Overview + +``` +┌──────────────────────────────────────────────────────────────┐ +│ __main__.py (orchestrator) │ +│ run_review() → build passes → run passes → write reports │ +└─────────────────────────────┬────────────────────────────────┘ + │ + ┌─────────────────────┼─────────────────────┐ + │ │ │ + ▼ ▼ ▼ + ┌─────────┐ ┌──────────┐ ┌───────────┐ + │ git.py │ │ passes.py│ │ profile.py│ + │ (changes)│ │ (execute)│ │ (config) │ + └─────────┘ └────┬─────┘ └───────────┘ + │ + ┌──────────────┼──────────────┐ + │ │ │ + ▼ ▼ ▼ + ┌──────────┐ ┌───────────┐ ┌───────────────┐ + │ RetryStr │ │ ModelFall │ │ PassBuilder │ + │ ategy │ │ backState │ │ PassRunner │ + └──────────┘ └───────────┘ └───────────────┘ +``` + +| Module | Responsibility | +| ------------------- | ------------------------------------- | +| `__init__.py` | Public API exports | +| `__main__.py` | Entry point, orchestration | +| `cli.py` | Argument parsing | +| `constants.py` | Default configs and constants | +| `utils.py` | Shared helpers, `CodexwError` | +| `git.py` | Git operations (changes, numstat) | +| `profile.py` | Profile load/sync/normalize/write | +| `passes.py` | Pass execution, model/effort fallback | +| `prompts.py` | Prompt construction | +| `finding_parser.py` | Extract findings from output | +| `reporting.py` | Write reports and artifacts | +| `yaml_fallback.py` | Fallback YAML parser (no PyYAML) | +| `yaml_writer.py` | Fallback YAML writer (no PyYAML) | + +## Data Flow + +``` +┌─────────────────┐ +│ CLI Arguments │ +│ (--base, etc) │ +└────────┬────────┘ + │ + ▼ +┌─────────────────┐ ┌─────────────────┐ +│ Profile File │────▶│ normalize_ │ +│ (YAML/JSON) │ │ profile() │ +└─────────────────┘ └────────┬────────┘ + │ + ┌───────────────────────┘ + ▼ +┌─────────────────┐ ┌─────────────────┐ +│ Git: collect │────▶│ PassBuilder │ +│ changed files │ │ .build_passes()│ +└─────────────────┘ └────────┬────────┘ + │ + ▼ + ┌─────────────────┐ + │ PassRunner │ + │ .run_all() │ + └────────┬────────┘ + │ + ┌───────────────────────┼───────────────────────┐ + │ │ │ + ▼ ▼ ▼ +┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ +│ codex review │ │ parse_findings │ │ write_combined │ +│ (per pass) │────▶│ _from_pass() │────▶│ _report() │ +└─────────────────┘ └─────────────────┘ └─────────────────┘ +``` + +## Key Classes + +### PassSpec (dataclass, frozen) + +Immutable specification for a single review pass. + +```python +@dataclass(frozen=True) +class PassSpec: + id: str # Unique ID for filenames (e.g., "pass-1-policy-sweep") + name: str # Human-readable name (e.g., "Policy: full standards sweep") + prompt: str # Full prompt to send to Codex CLI +``` + +### ModelFallbackState (dataclass, mutable) + +Shared state across passes for model/effort resolution reuse. + +```python +@dataclass +class ModelFallbackState: + preferred_model: str | None = None # User-requested model + selected_model: str | None = None # Resolved working model + selected_effort: str | None = None # Resolved working effort +``` + +### PassBuilder + +Constructs `PassSpec` objects from profile configuration. + +- Reads pipeline config (policy/core/domain/depth pass toggles) +- Builds prompt from base rubric + rules + diff context + pass-specific instructions +- Returns `list[PassSpec]` + +### PassRunner + +Executes passes and collects results. + +- Iterates through `PassSpec` list +- Calls `run_review_pass_with_compat()` for each +- Parses findings, builds summary +- Returns `(summary_lines, raw_findings)` + +### RetryStrategy + +Static methods for detecting retryable error conditions: + +| Method | Detects | +| -------------------------------- | ---------------------------------- | +| `should_retry_with_compat()` | Prompt+target flag incompatibility | +| `model_unavailable()` | Missing/inaccessible model | +| `reasoning_effort_unsupported()` | Invalid reasoning effort level | + +## Resilience Strategy + +### Model Fallback Chain + +When `model_unavailable()` is detected, codexw walks a **recency-biased predecessor chain**: + +``` +gpt-5.3-codex → gpt-5.2-codex → gpt-5.1-codex → gpt-5-codex → gpt-4.2-codex + ↑ ↑ ↑ ↑ ↑ + same-major predecessors │ prior-major probes + └── (limited to 5 models total) +``` + +**Policy rationale:** + +- Recent models are more likely to be available +- Avoids drifting into obsolete model tails +- 5-model window prevents runaway fallback + +### Reasoning Effort Fallback + +When `reasoning_effort_unsupported()` is detected, codexw downgrades effort: + +``` +xhigh → high → medium → low → minimal +``` + +**Detection signals:** + +- Structured JSON error with `param: "reasoning.effort"` +- Error message mentioning `model_reasoning_effort` +- "unsupported", "not supported", "invalid value" keywords + +### State Persistence + +Once a working model+effort pair is found, `ModelFallbackState` preserves it for subsequent passes in the same run. This avoids repeated fallback overhead. + +## Pass Pipeline + +Default pipeline runs 4 pass categories: + +| Category | Purpose | +| ------------------- | -------------------------------------------- | +| **Policy** | Enforce all discovered rule files | +| **Core** (4 passes) | Breadth → Regressions → Architecture → Tests | +| **Domain** | Per-domain focused review | +| **Depth** | Hotspot files (top N by churn) | + +Each pass type can be toggled via `pipeline.include_*` flags. + +## Profile Sync + +On each run (unless `--no-sync-profile`), codexw: + +1. Infers signals from repository (rules, domains, prompts) +2. Merges with existing profile, preserving manual edits +3. Prunes stale auto-managed entries +4. Writes updated profile back + +Auto-managed entries are tracked in `profile_meta.autogen` to distinguish them from manual edits. + +## Extension Points + +### Adding a New Pass Type + +1. Add toggle in `constants.py` (e.g., `include_security_pass`) +2. Extend `PassBuilder.build_passes()` to construct new `PassSpec` +3. No changes needed in `PassRunner` — it executes any `PassSpec` + +### Adding a New Fallback Strategy + +1. Add detection method to `RetryStrategy` +2. Extend `run_review_pass_with_fallback()` to check and handle it +3. Add corresponding test case + +### Adding a New Profile Field + +1. Add default in `constants.py` +2. Handle in `normalize_profile()` (profile.py) +3. Handle in `sync_profile_with_repo()` if auto-synced +4. Update example profiles + +## Testing + +Primary test file: `test/codexw_test.py` + +| Category | Coverage | +| -------------------- | -------------------------------------------- | +| YAML parsing | Flow lists, comments, nulls, quotes | +| Profile bootstrap | Domain inference, generic prompts | +| Model fallback | Chain construction, retry behavior | +| Effort fallback | Detection, downgrade sequence | +| State persistence | Cross-pass reuse | +| Error classification | Structured/unstructured retry signal parsing | + +Canonical local verification: + +```bash +# Targeted codexw unit coverage +python3 test/codexw_test.py -q + +# Python syntax sanity for codexw modules +python3 -m py_compile codexw/*.py + +# Full repository validation (includes sync-ai-rules + formatting checks) +make test +``` + +Optional equivalent unit command: + +```bash +python3 -m pytest test/codexw_test.py -v +``` + +## Dependencies + +**Required:** + +- Python 3.9+ +- `codex` CLI in PATH + +**Optional:** + +- `PyYAML` (for full YAML support; fallback parser handles common cases) diff --git a/codexw/cli.py b/codexw/cli.py new file mode 100644 index 0000000..8396cca --- /dev/null +++ b/codexw/cli.py @@ -0,0 +1,87 @@ +"""CLI argument parsing for codexw. + +This module handles command-line argument parsing. +Keeps the main entry point clean. +""" + +from __future__ import annotations + +import argparse + + +def build_parser() -> argparse.ArgumentParser: + """Build the argument parser for codexw.""" + parser = argparse.ArgumentParser( + prog="codexw", + description="Generic, profile-aware Codex wrapper for local PR-grade review.", + ) + sub = parser.add_subparsers(dest="command") + + review = sub.add_parser( + "review", + help="Run profile-driven PR-grade multi-pass review.", + ) + review_pr = sub.add_parser( + "review-pr", + help="Alias for 'review' (kept for backward compatibility).", + ) + + def add_review_args(target_parser: argparse.ArgumentParser) -> None: + target_parser.add_argument( + "--profile", help="Path to local-review-profile.yaml", default=None + ) + mode = target_parser.add_mutually_exclusive_group() + mode.add_argument("--base", help="Base branch", default=None) + mode.add_argument("--uncommitted", action="store_true", help="Review uncommitted changes") + mode.add_argument("--commit", help="Review a specific commit SHA", default=None) + target_parser.add_argument("--domains", help="Comma-separated domain list", default=None) + target_parser.add_argument( + "--depth-hotspots", type=int, help="Number of hotspot depth passes" + ) + target_parser.add_argument("--title", help="Optional review title", default=None) + target_parser.add_argument( + "--output-dir", help="Output directory for artifacts", default=None + ) + target_parser.add_argument("--model", help="Optional model override", default=None) + target_parser.add_argument( + "--print-effective-profile", + action="store_true", + help="Print normalized profile and exit (no review execution)", + ) + target_parser.add_argument( + "--bootstrap-only", + action="store_true", + help="Create missing profile (if needed) and exit", + ) + target_parser.add_argument( + "--sync-profile-only", + action="store_true", + help="Sync profile from repository signals and exit", + ) + target_parser.add_argument( + "--no-bootstrap-profile", + action="store_true", + help="Disable automatic profile generation when missing", + ) + target_parser.add_argument( + "--no-sync-profile", + action="store_true", + help="Disable automatic profile sync from repository signals", + ) + target_parser.add_argument( + "--no-prune-autogen", + action="store_true", + help="Keep stale auto-managed profile entries for this run", + ) + gate_mode = target_parser.add_mutually_exclusive_group() + gate_mode.add_argument("--fail-on-findings", action="store_true", help="Force strict gate") + gate_mode.add_argument( + "--no-fail-on-findings", + action="store_true", + help="Exploratory mode; do not fail when findings exist", + ) + + add_review_args(review) + add_review_args(review_pr) + + return parser diff --git a/codexw/constants.py b/codexw/constants.py new file mode 100644 index 0000000..c4539c8 --- /dev/null +++ b/codexw/constants.py @@ -0,0 +1,128 @@ +"""Constants and default configurations for codexw. + +This module contains all magic numbers, default pass specifications, and +sentinel values used throughout the codexw package. Centralizing these +makes the codebase easier to understand and maintain. +""" + +from __future__ import annotations + +# Sentinel string that Codex outputs when no actionable findings exist. +# Used for pass success/failure detection. +NO_FINDINGS_SENTINEL = "No actionable findings." + +# Default global prompt injected into all review passes. +# Provides baseline review guidance without repo-specific context. +DEFAULT_GLOBAL_PROMPT = ( + "Use repository standards for lifecycle, state, architecture boundaries, and " + "production-safety. Prioritize behavior-changing issues and policy violations " + "over style-only comments." +) + +# Policy pass instructions template. +# The policy pass enforces all discovered rule files and outputs coverage. +DEFAULT_POLICY_PASS_INSTRUCTIONS = ( + "Task:\n" + "- Enforce every standard file listed above.\n" + "- Output a 'Rule Coverage' section with one line per rule file:\n" + " :: Covered | NotApplicable :: short reason\n" + "- Then output actionable findings using the required schema.\n" + f"- If no actionable findings exist, include exactly this line: {NO_FINDINGS_SENTINEL}" +) + +# Core passes run for the "core" domain. Each pass focuses on a different +# risk class to improve recall and reduce blind spots. +# +# Rationale for 4 passes: +# 1. core-breadth: Ensures every changed file is touched at least once. +# Catches obvious issues across the entire diff surface. +# 2. core-regressions: Focuses on behavioral changes, crashes, and security. +# These are the highest-impact bugs that escape code review. +# 3. core-architecture: Focuses on structural issues - boundaries, concurrency, +# lifecycle. Harder to spot but cause long-term maintenance pain. +# 4. core-tests: Focuses on test coverage gaps. Missing tests allow future +# regressions to slip through. +DEFAULT_CORE_PASS_SPECS: list[dict[str, str]] = [ + { + "id": "core-breadth", + "name": "Core 1: breadth coverage across all changed files", + "instructions": ( + "Task:\n" + "- Perform full-breadth review across every changed file listed above.\n" + "- Output a 'Breadth Coverage' section with one line per changed file:\n" + " :: Reviewed | NotApplicable :: short reason\n" + "- Then output actionable findings using the required schema.\n" + f"- If no actionable findings exist, include exactly this line: {NO_FINDINGS_SENTINEL}" + ), + }, + { + "id": "core-regressions", + "name": "Core 2: regressions/security/crash scan", + "instructions": ( + "Focus areas:\n" + "- behavioral regressions\n" + "- crash/nullability risks\n" + "- state corruption and data-loss risks\n" + "- security and privacy issues" + ), + }, + { + "id": "core-architecture", + "name": "Core 3: architecture/concurrency scan", + "instructions": ( + "Focus areas:\n" + "- architecture boundaries and dependency misuse\n" + "- lifecycle and concurrency/threading issues\n" + "- error-handling/fallback correctness\n" + "- protocol/contract boundary failures" + ), + }, + { + "id": "core-tests", + "name": "Core 4: test-coverage scan", + "instructions": ( + "Focus areas:\n" + "- missing tests required to protect the change\n" + "- high-risk edge cases without coverage\n" + "- regressions likely to escape without tests" + ), + }, +] + +# Depth pass instructions template. +# {hotspot} is replaced with the actual file path at runtime. +DEFAULT_DEPTH_PASS_INSTRUCTIONS = ( + "Task:\n" + "- Perform depth-first review of hotspot file: {hotspot}\n" + "- Traverse directly related changed call paths\n" + "- Prioritize subtle behavioral, concurrency, state, and boundary-condition failures\n" + "- Output only actionable findings with required schema\n" + f"- If no actionable findings exist, include exactly this line: {NO_FINDINGS_SENTINEL}" +) + +# Default review configuration values. +DEFAULT_BASE_BRANCH = "main" +DEFAULT_DEPTH_HOTSPOTS = 3 +DEFAULT_OUTPUT_ROOT = ".codex/review-runs" +DEFAULT_STRICT_GATE = True + +# Default rule patterns to search for when no profile exists. +DEFAULT_RULE_PATTERNS = ("AGENTS.md", ".cursor/rules/**/*.mdc") + +# Model fallback and compatibility handling constants. +# These are used by pass execution retry logic. +REASONING_EFFORT_ORDER = ("xhigh", "high", "medium", "low", "minimal") +DEFAULT_MODEL_FALLBACK_WINDOW = 5 +PREVIOUS_MAJOR_MINOR_CANDIDATES = (2, 1) +REASONING_PARAM_HINTS = {"model_reasoning_effort", "reasoning.effort", "reasoning_effort"} +MODEL_UNAVAILABLE_CODE_HINTS = { + "model_not_found", + "unknown_model", + "model_unavailable", + "unsupported_model", + "model_not_supported", +} +COMPAT_CODE_HINTS = { + "unsupported_option_combination", + "unsupported_argument_combination", +} diff --git a/codexw/features-and-usecases.md b/codexw/features-and-usecases.md new file mode 100644 index 0000000..7d18166 --- /dev/null +++ b/codexw/features-and-usecases.md @@ -0,0 +1,89 @@ +# Codexw Features + Use Cases + +This document describes the local review capabilities provided by `codexw` and associated pre-commit hooks. + +## Review Paths + +1. **Quick review (`codex-review`)** + Runs plain `codex review` from manual pre-commit stage for fast local sanity checks before push. + Why it matters: lowest-latency feedback path. + +2. **PR-grade review (`codex-review-pr-grade`, alias `codexw`)** + Runs `./codexw/__main__.py review` with profile-driven multi-pass orchestration. + Why it matters: deeper and more consistent review than one-shot prompts. + +## Profile + Policy Controls + +3. **Rule pattern validation at startup** + Resolves `rules.include` entries and warns on missing/unmatched patterns; invalid patterns are dropped for that run. + Why it matters: prevents false assumptions that stale rules are being enforced. + +4. **Profile bootstrap + sync** (`--bootstrap-only`, `--sync-profile-only`) + Auto-generates missing profile and syncs repository-derived signals (rules/domains/prompts) while preserving manual edits. + Why it matters: reduces manual profile drift across repos. + +5. **Sync safety toggles** (`--no-sync-profile`, `--no-prune-autogen`) + Allows freezing sync behavior for debugging or controlled rollouts. + Why it matters: makes behavior changes diagnosable and reversible. + +6. **Effective profile inspection** (`--print-effective-profile`) + Prints normalized runtime profile and exits without running review passes. + Why it matters: configuration is inspectable before expensive execution. + +## Execution Scope + Quality Depth + +7. **Target scope control** (`--base`, `--uncommitted`, `--commit`) + Limits review to the intended change window: branch diff, dirty state, or specific commit. + Why it matters: less noise, higher relevance. + +8. **Domain-focused passes** (`--domains core,testing`) + Runs only selected domain passes and domain prompts for targeted work. + Why it matters: runtime budget is focused on high-value domains. + +9. **Multi-pass pipeline** (policy/core/domain/depth) + Executes specialized passes instead of one flat prompt to improve breadth + depth. + Why it matters: better recall across different risk classes. + +10. **Hotspot depth analysis** (`--depth-hotspots`) + Uses churn-derived hotspots for extra depth passes on high-risk files. + Why it matters: additional scrutiny where defects are more likely. + +11. **Gating modes** (`--fail-on-findings`, `--no-fail-on-findings`) + Supports strict non-zero gate or advisory mode for earlier iteration. + Why it matters: same tool works across dev and pre-merge stages. + +## Resilience + Portability + +12. **Fallback YAML parser/writer** (no `PyYAML` required) + Profile read/write still works without `PyYAML`; includes coverage for flow lists, comments, nulls, and quoted scalars. + Why it matters: workflow remains portable across minimal environments. + +13. **CLI compatibility retry** + If CLI rejects prompt+target combinations, wrapper retries in a compatible path. + Why it matters: fewer environment-specific hard failures. + +14. **Model fallback (recency-biased)** + If model is unavailable, fallback is attempted within a fixed recent 5-model window from the original requested model (for example `gpt-5.3-codex -> gpt-5.2-codex -> gpt-5.1-codex -> gpt-5-codex -> gpt-4.2-codex`). + Why it matters: avoids drifting into obsolete model tails. + +15. **Reasoning-effort fallback** + If reasoning effort is unsupported, effort is downgraded (`xhigh -> high -> medium -> low`) using structured error signals and tolerant parsing. + Why it matters: resilient behavior across CLI/API message format changes. + +16. **Resolved model/effort reuse within run** + Once a working pair is found, subsequent passes reuse it in the same review run. + Why it matters: reduces repeated failure/retry overhead. + +## Outputs + Integration + +17. **Structured artifacts** + Emits per-pass markdown, combined report, findings JSON, and support files (`changed-files`, `modules`, `hotspots`, `enforced rules`, `pass status`). + Why it matters: supports human triage plus machine automation. + +18. **Wrapper orchestration over plain `codex review`** + Adds pass planning, prompt composition, rule injection, finding parsing, reporting, and gating around the same backend CLI engine. + Why it matters: local review quality approaches dedicated PR-review workflows. + +--- + +For internal architecture details, see [architecture.md](./architecture.md). diff --git a/codexw/finding_parser.py b/codexw/finding_parser.py new file mode 100644 index 0000000..79a371a --- /dev/null +++ b/codexw/finding_parser.py @@ -0,0 +1,119 @@ +"""Finding parser for codexw. + +This module extracts structured findings from Codex review output. +Keeps parsing logic isolated from orchestration. +""" + +from __future__ import annotations + +import re +from typing import Any + +from .constants import NO_FINDINGS_SENTINEL + + +def extract_line_number(raw: str) -> int | None: + """Extract line number from a line reference string.""" + match = re.search(r"\d+", raw) + if not match: + return None + try: + number = int(match.group(0)) + except ValueError: + return None + return number if number > 0 else None + + +def normalize_finding_line(raw_line: str) -> str: + """Normalize a finding line by removing markdown formatting.""" + line = raw_line.strip() + if not line: + return "" + + # Remove bullet points and numbering + line = re.sub(r"^[-*+]\s*", "", line) + line = re.sub(r"^\d+[.)]\s*", "", line) + + # Remove bold/code formatting + line = re.sub(r"^\*\*([^*]+)\*\*\s*", r"\1 ", line) + line = re.sub(r"^__([^_]+)__\s*", r"\1 ", line) + line = re.sub(r"^`([^`]+)`\s*", r"\1 ", line) + + # Normalize spacing around colons + line = re.sub(r"\s+:\s*", ": ", line, count=1) + return line + + +def parse_findings_from_pass(text: str, pass_id: str) -> list[dict[str, Any]]: + """Parse structured findings from pass output text.""" + findings: list[dict[str, Any]] = [] + current: dict[str, Any] | None = None + + def flush() -> None: + nonlocal current + if not current: + return + severity = str(current.get("severity", "")).strip().upper() + file_path = str(current.get("file_path", "")).strip() + if severity and file_path: + current["pass_id"] = pass_id + current["line"] = extract_line_number(str(current.get("line_raw", ""))) + findings.append(current) + current = None + + for raw_line in text.splitlines(): + line = normalize_finding_line(raw_line) + if not line: + continue + if NO_FINDINGS_SENTINEL in line: + continue + + # New finding starts with Severity + severity_match = re.match(r"(?i)^severity\s*:\s*(P[0-3])\b", line) + if severity_match: + flush() + current = { + "severity": severity_match.group(1).upper(), + "type": "", + "file_path": "", + "line_raw": "", + "rule": "", + "risk": "", + "fix": "", + "title": "", + } + continue + + if not current: + continue + + # Parse finding fields + if re.match(r"(?i)^type\s*:", line): + current["type"] = line.split(":", 1)[1].strip() + elif re.match(r"(?i)^(file\s*path|path|file)\s*:", line): + current["file_path"] = line.split(":", 1)[1].strip() + elif re.match(r"(?i)^(line|line\s*number|precise line number|line range)\s*:", line): + current["line_raw"] = line.split(":", 1)[1].strip() + elif re.match(r"(?i)^violated rule", line): + current["rule"] = line.split(":", 1)[1].strip() + elif re.match(r"(?i)^why this is risky\s*:", line): + current["risk"] = line.split(":", 1)[1].strip() + elif re.match(r"(?i)^minimal fix direction\s*:", line): + current["fix"] = line.split(":", 1)[1].strip() + elif re.match(r"(?i)^title\s*:", line): + current["title"] = line.split(":", 1)[1].strip() + # Continuation of risk description + elif current.get("risk"): + current["risk"] = f"{current['risk']} {line}".strip() + + flush() + return findings + + +def pass_has_no_findings(text: str, parsed_findings: list[dict[str, Any]] | None = None) -> bool: + """Check if pass output indicates no actionable findings.""" + if NO_FINDINGS_SENTINEL not in text: + return False + if parsed_findings is None: + parsed_findings = parse_findings_from_pass(text, "probe") + return len(parsed_findings) == 0 diff --git a/codexw/git.py b/codexw/git.py new file mode 100644 index 0000000..6f50716 --- /dev/null +++ b/codexw/git.py @@ -0,0 +1,167 @@ +"""Git operations for codexw. + +This module encapsulates all git-related functionality, providing a clean +interface for the rest of the codebase. Keeps git commands isolated from +review logic. +""" + +from __future__ import annotations + +import subprocess +from pathlib import Path + +from .utils import CodexwError, run_checked + + +def list_untracked_files(repo_root: Path) -> list[str]: + """Return untracked file paths relative to repo root.""" + out = run_checked( + ["git", "ls-files", "--others", "--exclude-standard"], + repo_root, + ) + return sorted({line.strip() for line in out.splitlines() if line.strip()}) + + +def count_file_lines(path: Path) -> int: + """Best-effort line count for a file on disk.""" + try: + with path.open("rb") as fh: + newline_count = 0 + saw_any_bytes = False + ends_with_newline = False + + while True: + chunk = fh.read(64 * 1024) + if not chunk: + break + saw_any_bytes = True + newline_count += chunk.count(b"\n") + ends_with_newline = chunk.endswith(b"\n") + except OSError: + return 0 + if not saw_any_bytes: + return 0 + return newline_count + (0 if ends_with_newline else 1) + + +def find_repo_root(start: Path) -> Path: + """Find the git repository root from a starting path.""" + try: + out = run_checked(["git", "rev-parse", "--show-toplevel"], start).strip() + if out: + return Path(out) + except CodexwError: + pass + return start + + +def git_ref_exists(repo_root: Path, ref: str) -> bool: + """Check if a git ref exists.""" + proc = subprocess.run( + ["git", "show-ref", "--verify", "--quiet", ref], + cwd=str(repo_root), + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + text=True, + check=False, + ) + return proc.returncode == 0 + + +def detect_default_base(repo_root: Path) -> str: + """Detect the default base branch (master or main).""" + # Prefer local branches; if only remote-tracking exists, return + # remote-qualified ref so diff commands remain valid in detached clones. + for candidate in ("master", "main"): + if git_ref_exists(repo_root, f"refs/heads/{candidate}"): + return candidate + for candidate in ("master", "main"): + if git_ref_exists(repo_root, f"refs/remotes/origin/{candidate}"): + return f"origin/{candidate}" + + return "main" + + +def resolve_base_ref(repo_root: Path, base: str) -> str: + """Resolve branch-like base to a usable git ref. + + If the requested base is a plain branch name and no local branch exists + but `origin/` does, return `origin/`. + """ + raw = str(base).strip() + if not raw: + return raw + if "/" in raw: + return raw + if git_ref_exists(repo_root, f"refs/heads/{raw}"): + return raw + if git_ref_exists(repo_root, f"refs/remotes/origin/{raw}"): + return f"origin/{raw}" + return raw + + +def collect_changed_files(repo_root: Path, mode: str, base: str, commit: str) -> list[str]: + """Collect list of changed files based on review mode.""" + if mode == "base": + base_ref = resolve_base_ref(repo_root, base) + out = run_checked(["git", "diff", "--name-only", f"{base_ref}...HEAD"], repo_root) + return sorted({line.strip() for line in out.splitlines() if line.strip()}) + + if mode == "uncommitted": + out1 = run_checked(["git", "diff", "--name-only", "HEAD"], repo_root) + out2 = "\n".join(list_untracked_files(repo_root)) + return sorted({line.strip() for line in (out1 + "\n" + out2).splitlines() if line.strip()}) + + if mode == "commit": + out = run_checked(["git", "show", "--name-only", "--pretty=", commit], repo_root) + return sorted({line.strip() for line in out.splitlines() if line.strip()}) + + raise CodexwError(f"unsupported mode: {mode}") + + +def collect_numstat(repo_root: Path, mode: str, base: str, commit: str) -> list[tuple[int, str]]: + """Collect file change statistics (added + deleted lines per file).""" + if mode == "base": + base_ref = resolve_base_ref(repo_root, base) + cmd = ["git", "diff", "--numstat", f"{base_ref}...HEAD"] + elif mode == "uncommitted": + cmd = ["git", "diff", "--numstat", "HEAD"] + elif mode == "commit": + cmd = ["git", "show", "--numstat", "--pretty=", commit] + else: + raise CodexwError(f"unsupported mode: {mode}") + + out = run_checked(cmd, repo_root) + changes_by_path: dict[str, int] = {} + for raw in out.splitlines(): + parts = raw.split("\t") + if len(parts) < 3: + continue + add_raw, del_raw, path = parts[0], parts[1], parts[2] + add = int(add_raw) if add_raw.isdigit() else 0 + rem = int(del_raw) if del_raw.isdigit() else 0 + changes_by_path[path] = add + rem + + # `git diff --numstat HEAD` excludes untracked files; include them so + # hotspot depth passes can prioritize newly added files during local review. + if mode == "uncommitted": + for rel_path in list_untracked_files(repo_root): + if rel_path in changes_by_path: + continue + changes_by_path[rel_path] = count_file_lines(repo_root / rel_path) + + rows = [(delta, path) for path, delta in changes_by_path.items()] + rows.sort(key=lambda x: x[0], reverse=True) + return rows + + +def changed_modules(changed_files: list[str]) -> list[tuple[int, str]]: + """Group changed files by top-level module (first 2 path components).""" + counts: dict[str, int] = {} + for path in changed_files: + parts = path.split("/") + key = "/".join(parts[:2]) if len(parts) >= 2 else parts[0] + counts[key] = counts.get(key, 0) + 1 + rows = [(count, module) for module, count in counts.items()] + rows.sort(key=lambda x: (-x[0], x[1])) + return rows diff --git a/codexw/local-review-profile.duolingo-android.example.yaml b/codexw/local-review-profile.duolingo-android.example.yaml new file mode 100644 index 0000000..2dce3c8 --- /dev/null +++ b/codexw/local-review-profile.duolingo-android.example.yaml @@ -0,0 +1,66 @@ +version: 1 + +repo: + name: Duolingo Android + +review: + default_base: master + strict_gate: true + depth_hotspots: 3 + output_root: .codex/review-runs + +rules: + include: + - AGENTS.md + - .cursor/rules/**/*.mdc + +domains: + default: + - core + - experiments + - compose + - coroutines + - testing + allowed: + - core + - experiments + - compose + - coroutines + - testing + +prompts: + global: | + Prioritize behavior-changing issues over style concerns. + Treat AGENTS.md and synced AI rules as enforcement sources. + by_domain: + experiments: | + Focus on experiment gating correctness, control/treatment isolation, and cleanup safety. + compose: | + Focus on design-system conformance, state/recomposition behavior, and lifecycle safety. + coroutines: | + Focus on structured concurrency, cancellation behavior, and dispatcher/lifecycle correctness. + testing: | + Focus on missing tests for high-risk paths and weak assertions around side effects. + +pipeline: + include_policy_pass: true + include_core_passes: true + include_domain_passes: true + include_depth_passes: true + core_passes: + - id: core-breadth + name: Core breadth coverage + instructions: | + Review every changed file in this shard and report actionable findings only. + - id: core-regressions + name: Core regressions + instructions: | + Focus on behavioral regressions, crash/nullability, and security/privacy issues. + - id: core-architecture + name: Core architecture + instructions: | + Focus on architecture boundaries, concurrency, and lifecycle correctness. + - id: core-tests + name: Core tests + instructions: | + Focus on missing tests and high-risk edge cases without coverage. diff --git a/codexw/local-review-profile.repo.example.yaml b/codexw/local-review-profile.repo.example.yaml new file mode 100644 index 0000000..7d48b3d --- /dev/null +++ b/codexw/local-review-profile.repo.example.yaml @@ -0,0 +1,53 @@ +version: 1 + +repo: + name: Repo Name + +review: + default_base: main + strict_gate: true + depth_hotspots: 3 + output_root: .codex/review-runs + +rules: + include: + - AGENTS.md + - .cursor/rules/**/*.mdc + +domains: + default: + - core + - testing + allowed: + - core + - testing + +prompts: + global: | + Add repository-wide review context here. + by_domain: + testing: | + Add testing-specific context for this repository. + +pipeline: + include_policy_pass: true + include_core_passes: true + include_domain_passes: true + include_depth_passes: true + core_passes: + - id: core-breadth + name: Core breadth coverage + instructions: | + Review every changed file in this shard and report actionable findings only. + - id: core-regressions + name: Core regressions + instructions: | + Focus on behavioral regressions, crash/nullability, and security/privacy issues. + - id: core-architecture + name: Core architecture + instructions: | + Focus on architecture boundaries, concurrency, and lifecycle correctness. + - id: core-tests + name: Core tests + instructions: | + Focus on missing tests and high-risk edge cases without coverage. diff --git a/codexw/passes.py b/codexw/passes.py new file mode 100644 index 0000000..ee206f4 --- /dev/null +++ b/codexw/passes.py @@ -0,0 +1,624 @@ +"""Pass orchestration for codexw. + +This module handles the execution of review passes against the Codex CLI. +Includes retry logic for: +- CLI prompt+target compatibility issues +- Model availability fallback (recursive predecessor chain) +- Reasoning-effort fallback when model-specific settings are unsupported +""" + +from __future__ import annotations + +import re +import sys +from collections import deque +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +from .constants import ( + COMPAT_CODE_HINTS, + DEFAULT_DEPTH_PASS_INSTRUCTIONS, + DEFAULT_MODEL_FALLBACK_WINDOW, + MODEL_UNAVAILABLE_CODE_HINTS, + PREVIOUS_MAJOR_MINOR_CANDIDATES, + REASONING_EFFORT_ORDER, + REASONING_PARAM_HINTS, +) +from .finding_parser import parse_findings_from_pass, pass_has_no_findings +from .prompts import ( + build_base_rubric, + build_diff_context, + build_domain_prompt, + build_pass_prompt, + build_rule_block, +) +from .utils import CodexwError, run_captured, sanitize_pass_id + +MODEL_NAME_RE = re.compile(r"\b(gpt-\d+(?:\.\d+)?-codex)\b", re.IGNORECASE) +ERROR_CODE_RE = re.compile(r'(?im)(?:"code"|code)\s*[:=]\s*["\']?([a-z0-9_.-]+)') +ERROR_PARAM_RE = re.compile(r'(?im)(?:"param"|param(?:eter)?)\s*[:=]\s*["\']?([a-z0-9_.-]+)') + + +def extract_error_codes_and_params(output: str) -> tuple[set[str], set[str]]: + """Extract structured error code/param hints from mixed CLI output.""" + text = output.lower() + codes = {m.group(1).strip().lower() for m in ERROR_CODE_RE.finditer(text)} + params = {m.group(1).strip().lower() for m in ERROR_PARAM_RE.finditer(text)} + return codes, params + + +@dataclass(frozen=True) +class PassSpec: + """Specification for a single review pass. + + Attributes: + id: Unique identifier for the pass (used in filenames) + name: Human-readable name (displayed during execution) + prompt: The full prompt to send to Codex CLI + """ + + id: str + name: str + prompt: str + + +@dataclass +class ModelFallbackState: + """Mutable state shared across passes for model/effort fallback reuse.""" + + preferred_model: str | None = None + selected_model: str | None = None + selected_effort: str | None = None + + +class RetryStrategy: + """Strategy for retrying failed Codex CLI calls.""" + + @staticmethod + def should_retry_with_compat(output: str) -> bool: + """Check if failure indicates prompt+target incompatibility.""" + text = output.lower() + codes, _ = extract_error_codes_and_params(text) + if codes & COMPAT_CODE_HINTS: + return True + if "cannot be used with '[prompt]'" in text: + return True + if "cannot be used with" in text and "[prompt]" in text: + return True + return False + + @staticmethod + def model_unavailable(output: str) -> bool: + """Check if failure indicates missing/inaccessible model.""" + if RetryStrategy.reasoning_effort_unsupported(output): + return False + text = output.lower() + codes, _ = extract_error_codes_and_params(text) + if codes & MODEL_UNAVAILABLE_CODE_HINTS: + return True + + return ( + "model_not_found" in text + or "does not exist or you do not have access to it" in text + or ("model" in text and "not supported" in text) + or ("model" in text and "unsupported" in text) + or ("model" in text and "unknown" in text) + or ("model" in text and "not found" in text) + or ("model" in text and "unavailable" in text) + ) + + @staticmethod + def reasoning_effort_unsupported(output: str) -> bool: + """Check if failure indicates unsupported model_reasoning_effort.""" + text = output.lower() + _, params = extract_error_codes_and_params(text) + if params & REASONING_PARAM_HINTS: + return True + + if not any( + marker in text + for marker in ("model_reasoning_effort", "reasoning.effort", "reasoning effort") + ): + if not ( + ("reasoning" in text or "effort" in text) + and any(e in text for e in REASONING_EFFORT_ORDER) + ): + return False + return ( + "unsupported" in text + or "not supported" in text + or "invalid value" in text + or "must be one of" in text + or "supported values" in text + ) + + +def normalize_model_name(model: str | None) -> str | None: + """Normalize model name to lowercase, or None when empty.""" + if not model: + return None + normalized = str(model).strip().lower() + return normalized or None + + +def build_model_fallback_chain( + start_model: str, + *, + max_models: int = DEFAULT_MODEL_FALLBACK_WINDOW, +) -> list[str]: + """Build recency-biased predecessor chain for a Codex model. + + Policy: + - Keep fallback focused on recent models to avoid obsolete tails. + - Prefer same-major predecessors first. + - Then probe likely recent variants from prior major(s): .2, .1, base. + """ + model = normalize_model_name(start_model) + if not model: + return [] + if max_models <= 0: + return [] + + match = re.fullmatch(r"gpt-(\d+)(?:\.(\d+))?-codex", model) + if not match: + return [model] + + major = int(match.group(1)) + minor = int(match.group(2)) if match.group(2) is not None else None + + chain: list[str] = [] + seen: set[str] = set() + + def append_candidate(candidate: str) -> bool: + if candidate in seen: + return False + seen.add(candidate) + chain.append(candidate) + return len(chain) >= max_models + + append_candidate(model) + + # Same-major predecessors (e.g. 5.3 -> 5.2 -> 5.1 -> 5) + if minor is not None: + for prev_minor in range(minor - 1, 0, -1): + if append_candidate(f"gpt-{major}.{prev_minor}-codex"): + return chain + if append_candidate(f"gpt-{major}-codex"): + return chain + + # Prior major recency probes (include .2 explicitly, then .1, then base). + prev_major = major - 1 + while prev_major >= 1 and len(chain) < max_models: + for prev_minor in PREVIOUS_MAJOR_MINOR_CANDIDATES: + if append_candidate(f"gpt-{prev_major}.{prev_minor}-codex"): + return chain + if append_candidate(f"gpt-{prev_major}-codex"): + return chain + prev_major -= 1 + + return chain + + +def extract_model_from_output(output: str) -> str | None: + """Extract first model-like token from CLI output.""" + match = MODEL_NAME_RE.search(output) + if not match: + return None + return normalize_model_name(match.group(1)) + + +def extract_configured_effort_from_output(output: str) -> str | None: + """Extract configured effort token from output, when present.""" + text = output.lower() + lines = text.splitlines() + for line in lines: + if not any( + marker in line + for marker in ("model_reasoning_effort", "reasoning.effort", "reasoning effort") + ): + continue + for effort in REASONING_EFFORT_ORDER: + if re.search(rf"\b{re.escape(effort)}\b", line): + return effort + for effort in REASONING_EFFORT_ORDER: + if re.search(rf"\b{re.escape(effort)}\b", text): + return effort + return None + + +def extract_supported_effort_from_output(output: str) -> str | None: + """Extract highest-priority supported effort from output hints.""" + text = output.lower() + if not RetryStrategy.reasoning_effort_unsupported(output): + return None + + for effort in REASONING_EFFORT_ORDER[1:]: + if re.search(rf"\b{re.escape(effort)}\b", text): + return effort + return None + + +def next_lower_effort(current_effort: str | None) -> str | None: + """Return next lower effort in fallback order.""" + if not current_effort: + return "high" + normalized = current_effort.strip().lower() + try: + idx = REASONING_EFFORT_ORDER.index(normalized) + except ValueError: + return "high" + next_idx = idx + 1 + if next_idx >= len(REASONING_EFFORT_ORDER): + return None + return REASONING_EFFORT_ORDER[next_idx] + + +def build_review_cmd( + *, + target_args: list[str], + prompt: str, + model: str | None, + effort: str | None, +) -> list[str]: + """Build codex review command with optional model/effort overrides.""" + cmd = ["codex", "review", *target_args] + if model: + cmd += ["-c", f'model="{model}"'] + if effort: + cmd += ["-c", f'model_reasoning_effort="{effort}"'] + cmd.append(prompt) + return cmd + + +def next_fallback_model( + *, + anchor_model: str, + effort: str | None, + tried_attempts: set[tuple[str, str]], +) -> str | None: + """Return next predecessor model not yet attempted for current effort.""" + chain = build_model_fallback_chain(anchor_model) + if len(chain) <= 1: + return None + + for candidate in chain[1:]: + key = (candidate, effort or "") + if key not in tried_attempts: + return candidate + return None + + +def run_review_pass_with_fallback( + *, + repo_root: Path, + out_file: Path, + target_args: list[str], + pass_spec: PassSpec, + prompt: str, + model_state: ModelFallbackState, + allow_compat_short_circuit: bool, +) -> tuple[int, str]: + """Run codex review with model/effort fallback, return (exit_code, output).""" + attempted: set[tuple[str, str]] = set() + + initial_model = normalize_model_name(model_state.selected_model) or normalize_model_name( + model_state.preferred_model + ) + fallback_anchor_model = initial_model + initial_effort = model_state.selected_effort + + queue: deque[tuple[str | None, str | None]] = deque() + queue.append((initial_model, initial_effort)) + + last_exit = 1 + last_output = "" + + # Best-effort traversal: exhaust all discovered model/effort candidates. + # Termination is guaranteed by attempted-set deduplication. + while queue: + model, effort = queue.popleft() + model = normalize_model_name(model) + effort = effort.strip().lower() if isinstance(effort, str) and effort.strip() else None + + key = (model or "", effort or "") + if key in attempted: + continue + attempted.add(key) + + cmd = build_review_cmd( + target_args=target_args, + prompt=prompt, + model=model, + effort=effort, + ) + exit_code = run_captured(cmd, repo_root, out_file, stream_output=True) + last_output = out_file.read_text(encoding="utf-8", errors="replace") + last_exit = exit_code + + if exit_code == 0: + model_state.selected_model = model + model_state.selected_effort = effort + return 0, last_output + + if allow_compat_short_circuit and RetryStrategy.should_retry_with_compat(last_output): + return exit_code, last_output + + if RetryStrategy.model_unavailable(last_output): + if not fallback_anchor_model: + fallback_anchor_model = ( + model + or extract_model_from_output(last_output) + or normalize_model_name(model_state.preferred_model) + ) + if fallback_anchor_model: + next_model = next_fallback_model( + anchor_model=fallback_anchor_model, + effort=effort, + tried_attempts=attempted, + ) + if next_model: + print( + f"warning: model '{model or fallback_anchor_model}' unavailable; retrying pass " + f"'{pass_spec.name}' with predecessor model '{next_model}'.", + file=sys.stderr, + ) + queue.appendleft((next_model, effort)) + continue + + if RetryStrategy.reasoning_effort_unsupported(last_output): + supported_effort = extract_supported_effort_from_output(last_output) + if supported_effort and (model or "", supported_effort) not in attempted: + print( + f"warning: model_reasoning_effort unsupported; retrying pass " + f"'{pass_spec.name}' with '{supported_effort}'.", + file=sys.stderr, + ) + queue.appendleft((model, supported_effort)) + continue + + configured_effort = effort or extract_configured_effort_from_output(last_output) + lower_effort = next_lower_effort(configured_effort) + if lower_effort and (model or "", lower_effort) not in attempted: + from_effort = configured_effort or "configured-default" + print( + f"warning: model_reasoning_effort '{from_effort}' unsupported; retrying " + f"pass '{pass_spec.name}' with '{lower_effort}'.", + file=sys.stderr, + ) + queue.appendleft((model, lower_effort)) + continue + + break + + return last_exit, last_output + + +def run_review_pass_with_compat( + repo_root: Path, + out_file: Path, + target_args: list[str], + target_desc: str, + pass_spec: PassSpec, + model_state: ModelFallbackState, +) -> None: + """Run a review pass with compatibility retry.""" + exit_code, content = run_review_pass_with_fallback( + repo_root=repo_root, + out_file=out_file, + target_args=target_args, + pass_spec=pass_spec, + prompt=pass_spec.prompt, + model_state=model_state, + allow_compat_short_circuit=True, + ) + if exit_code == 0: + return + + if RetryStrategy.should_retry_with_compat(content) and target_args: + print( + f"warning: codex CLI rejected prompt+target flags; " + f"retrying pass '{pass_spec.name}' in prompt-only compatibility mode.", + file=sys.stderr, + ) + compat_prefix = ( + "Target selection requested for this pass:\n" + f"- {target_desc}\n" + "Apply review findings to the requested target using the repository context below." + ) + exit_code, _ = run_review_pass_with_fallback( + repo_root=repo_root, + out_file=out_file, + target_args=[], + pass_spec=pass_spec, + prompt=f"{compat_prefix}\n\n{pass_spec.prompt}", + model_state=model_state, + allow_compat_short_circuit=False, + ) + if exit_code == 0: + return + + raise CodexwError( + f"codex review failed in pass '{pass_spec.name}' with exit code {exit_code}. " + f"See {out_file} for details." + ) + + +class PassBuilder: + """Builds the list of passes to execute based on profile configuration.""" + + def __init__( + self, + profile: dict[str, Any], + rule_files: list[str], + changed_files: list[str], + modules: list[tuple[int, str]], + hotspots: list[str], + selected_domains: list[str], + ) -> None: + self.profile = profile + self.rule_files = rule_files + self.changed_files = changed_files + self.modules = modules + self.hotspots = hotspots + self.selected_domains = selected_domains + + # Build reusable prompt components + self.base_rubric = build_base_rubric(profile["repo_name"]) + self.rules_block = build_rule_block(rule_files) + self.diff_context = build_diff_context(changed_files, modules, hotspots) + self.global_prompt = profile.get("global_prompt", "") + + def _build_prompt(self, extra: str) -> str: + """Build a complete pass prompt.""" + return build_pass_prompt( + self.base_rubric, + self.rules_block, + self.diff_context, + self.global_prompt, + extra, + ) + + def build_passes(self) -> list[PassSpec]: + """Build list of PassSpec objects for execution.""" + passes: list[PassSpec] = [] + pipeline = self.profile["pipeline"] + pass_counter = 0 + + # Policy pass + if pipeline.get("include_policy_pass", True): + pass_counter += 1 + passes.append( + PassSpec( + id=f"pass-{pass_counter}-policy-sweep", + name="Policy: full standards coverage sweep", + prompt=self._build_prompt(str(pipeline.get("policy_instructions", ""))), + ) + ) + + # Core passes + if pipeline.get("include_core_passes", True) and "core" in self.selected_domains: + core_passes = pipeline.get("core_passes") or [] + for core_pass in core_passes: + pass_id = sanitize_pass_id(str(core_pass.get("id", "core-pass"))) + pass_name = str(core_pass.get("name", pass_id)).strip() or pass_id + instructions = str(core_pass.get("instructions", "")).strip() + if not instructions: + continue + pass_counter += 1 + passes.append( + PassSpec( + id=f"pass-{pass_counter}-{pass_id}", + name=pass_name, + prompt=self._build_prompt(instructions), + ) + ) + + # Domain passes + if pipeline.get("include_domain_passes", True): + for domain in self.selected_domains: + if domain == "core": + continue + pass_counter += 1 + slug = sanitize_pass_id(domain) + passes.append( + PassSpec( + id=f"pass-{pass_counter}-domain-{slug}", + name=f"Domain: {domain}", + prompt=self._build_prompt(build_domain_prompt(domain, self.profile)), + ) + ) + + # Depth passes + if pipeline.get("include_depth_passes", True): + depth_template = str( + pipeline.get("depth_instructions", DEFAULT_DEPTH_PASS_INSTRUCTIONS) + ) + for hotspot in self.hotspots: + pass_counter += 1 + hotspot_slug = sanitize_pass_id(hotspot.replace("/", "_")) + try: + depth_instructions = depth_template.format(hotspot=hotspot) + except Exception: + depth_instructions = DEFAULT_DEPTH_PASS_INSTRUCTIONS.format(hotspot=hotspot) + passes.append( + PassSpec( + id=f"pass-{pass_counter}-depth-{hotspot_slug}", + name=f"Depth hotspot: {hotspot}", + prompt=self._build_prompt(depth_instructions), + ) + ) + + return passes + + +class PassRunner: + """Executes review passes and collects results.""" + + def __init__( + self, + repo_root: Path, + output_root: Path, + target_args: list[str], + target_desc: str, + model_override: str | None = None, + ) -> None: + self.repo_root = repo_root + self.output_root = output_root + self.target_args = target_args + self.target_desc = target_desc + self.model_state = ModelFallbackState( + preferred_model=normalize_model_name(model_override), + ) + + def run_all( + self, + passes: list[PassSpec], + ) -> tuple[list[str], list[dict[str, Any]], list[Path]]: + """Run all passes, return (summary_lines, raw_findings, executed_pass_files).""" + summary_lines: list[str] = [] + raw_findings: list[dict[str, Any]] = [] + executed_pass_files: list[Path] = [] + + for index, pass_spec in enumerate(passes, start=1): + out_file = self.output_root / f"{pass_spec.id}.md" + executed_pass_files.append(out_file) + print(f"\n==> ({index}/{len(passes)}) {pass_spec.name}") + + run_review_pass_with_compat( + repo_root=self.repo_root, + out_file=out_file, + target_args=self.target_args, + target_desc=self.target_desc, + pass_spec=pass_spec, + model_state=self.model_state, + ) + + text = out_file.read_text(encoding="utf-8", errors="replace") + parsed = parse_findings_from_pass(text, pass_spec.id) + no_findings = pass_has_no_findings(text, parsed) + + # Handle unparsed findings + if not no_findings and not parsed: + parsed = [ + { + "severity": "P2", + "type": "UnparsedFinding", + "file_path": "(unparsed-output)", + "line_raw": "", + "line": None, + "rule": "", + "risk": "Pass output contained findings but did not match structured schema.", + "fix": "Ensure findings follow the required schema.", + "title": pass_spec.name, + "pass_id": pass_spec.id, + } + ] + + if no_findings: + summary_lines.append(f"- [PASS] {pass_spec.name}") + else: + summary_lines.append(f"- [FINDINGS] {pass_spec.name}") + raw_findings.extend(parsed) + + return summary_lines, raw_findings, executed_pass_files diff --git a/codexw/profile.py b/codexw/profile.py new file mode 100644 index 0000000..723391b --- /dev/null +++ b/codexw/profile.py @@ -0,0 +1,518 @@ +"""Profile management for codexw. + +This module handles loading, normalizing, syncing, and writing +review profile files. Profiles define repository-specific review +configuration. +""" + +from __future__ import annotations + +import datetime as dt +import glob +import json +import re +from pathlib import Path +from typing import Any, Sequence + +from .constants import ( + DEFAULT_BASE_BRANCH, + DEFAULT_CORE_PASS_SPECS, + DEFAULT_DEPTH_HOTSPOTS, + DEFAULT_DEPTH_PASS_INSTRUCTIONS, + DEFAULT_GLOBAL_PROMPT, + DEFAULT_OUTPUT_ROOT, + DEFAULT_POLICY_PASS_INSTRUCTIONS, + DEFAULT_RULE_PATTERNS, + DEFAULT_STRICT_GATE, +) +from .git import detect_default_base +from .utils import ( + CodexwError, + ensure_dict, + stable_json, + to_bool, + to_int, + to_nonempty_string, + to_string_list, + unique, +) +from .yaml_fallback import try_load_yaml +from .yaml_writer import dump_yaml_text + + +def load_profile(path: Path) -> dict[str, Any]: + """Load profile from YAML or JSON file.""" + text = path.read_text(encoding="utf-8") + + try: + import yaml + + data = yaml.safe_load(text) + if not isinstance(data, dict): + raise CodexwError(f"profile at {path} must be a mapping/object") + return data + except ModuleNotFoundError: + pass + except Exception as exc: + raise CodexwError(f"invalid YAML in {path}: {exc}") + + try: + data = json.loads(text) + except json.JSONDecodeError: + from .yaml_fallback import parse_simple_yaml + + try: + data = parse_simple_yaml(text) + except ValueError as exc: + raise CodexwError("PyYAML not available and profile parsing failed. " f"Details: {exc}") + + if not isinstance(data, dict): + raise CodexwError(f"profile at {path} must be a mapping/object") + return data + + +def write_profile(path: Path, profile: dict[str, Any]) -> None: + """Write profile to YAML file.""" + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(dump_yaml_text(profile), encoding="utf-8") + + +def normalize_profile(raw: dict[str, Any]) -> dict[str, Any]: + """Normalize raw profile dict into consistent structure.""" + repo = raw.get("repo") or {} + review = raw.get("review") or {} + rules = raw.get("rules") or {} + domains = raw.get("domains") or {} + prompts = raw.get("prompts") or {} + pipeline = raw.get("pipeline") or {} + + if not isinstance(repo, dict): + repo = {} + if not isinstance(review, dict): + review = {} + if not isinstance(rules, dict): + rules = {} + if not isinstance(domains, dict): + domains = {} + if not isinstance(prompts, dict): + prompts = {} + if not isinstance(pipeline, dict): + pipeline = {} + + allowed_domains = to_string_list(domains.get("allowed"), ["core"]) + default_domains = to_string_list(domains.get("default"), allowed_domains) + if not allowed_domains: + allowed_domains = ["core"] + if not default_domains: + default_domains = list(allowed_domains) + + domain_prompt_map = prompts.get("by_domain") + if not isinstance(domain_prompt_map, dict): + domain_prompt_map = {} + + pipeline_core_raw = pipeline.get("core_passes") + if not isinstance(pipeline_core_raw, list) or not pipeline_core_raw: + pipeline_core_raw = DEFAULT_CORE_PASS_SPECS + + pipeline_core_passes: list[dict[str, str]] = [] + for idx, raw_pass in enumerate(pipeline_core_raw, start=1): + if not isinstance(raw_pass, dict): + continue + pass_id = str(raw_pass.get("id", f"core-pass-{idx}")).strip() or f"core-pass-{idx}" + pass_name = str(raw_pass.get("name", pass_id)).strip() or pass_id + instructions = str(raw_pass.get("instructions", "")).strip() + if not instructions: + continue + pipeline_core_passes.append( + { + "id": pass_id, + "name": pass_name, + "instructions": instructions, + } + ) + + if not pipeline_core_passes: + pipeline_core_passes = json.loads(json.dumps(DEFAULT_CORE_PASS_SPECS)) + + return { + "version": str(raw.get("version", "1")), + "repo_name": to_nonempty_string(repo.get("name"), "Repository"), + "default_base": to_nonempty_string(review.get("default_base"), DEFAULT_BASE_BRANCH), + "strict_gate": to_bool(review.get("strict_gate"), DEFAULT_STRICT_GATE), + "depth_hotspots": to_int(review.get("depth_hotspots"), DEFAULT_DEPTH_HOTSPOTS), + "output_root": to_nonempty_string(review.get("output_root"), DEFAULT_OUTPUT_ROOT), + "rule_patterns": to_string_list(rules.get("include"), DEFAULT_RULE_PATTERNS), + "default_domains": default_domains, + "allowed_domains": allowed_domains, + "global_prompt": str(prompts.get("global", "")).strip(), + "domain_prompts": { + str(k): str(v).strip() for k, v in domain_prompt_map.items() if str(v).strip() + }, + "pipeline": { + "include_policy_pass": to_bool(pipeline.get("include_policy_pass"), True), + "include_core_passes": to_bool(pipeline.get("include_core_passes"), True), + "include_domain_passes": to_bool(pipeline.get("include_domain_passes"), True), + "include_depth_passes": to_bool(pipeline.get("include_depth_passes"), True), + "policy_instructions": str( + pipeline.get("policy_instructions", DEFAULT_POLICY_PASS_INSTRUCTIONS) + ).strip() + or DEFAULT_POLICY_PASS_INSTRUCTIONS, + "core_passes": pipeline_core_passes, + "depth_instructions": str( + pipeline.get("depth_instructions", DEFAULT_DEPTH_PASS_INSTRUCTIONS) + ).strip() + or DEFAULT_DEPTH_PASS_INSTRUCTIONS, + }, + } + + +def infer_repo_name(repo_root: Path) -> str: + """Infer repository name from directory name.""" + raw = repo_root.name.strip() + if not raw: + return "Repository" + + tokens = [t for t in re.split(r"[-_]+", raw) if t] + if not tokens: + return raw + + special = { + "ios": "iOS", + "android": "Android", + "api": "API", + "sdk": "SDK", + "ml": "ML", + "ai": "AI", + "ui": "UI", + } + + def normalize(token: str) -> str: + return special.get(token.lower(), token.capitalize()) + + return " ".join(normalize(t) for t in tokens) + + +def infer_rule_patterns(repo_root: Path) -> list[str]: + """Infer rule patterns from repository structure.""" + patterns: list[str] = [] + if (repo_root / "AGENTS.md").is_file(): + patterns.append("AGENTS.md") + if (repo_root / ".cursor/rules").is_dir(): + patterns.append(".cursor/rules/**/*.mdc") + if (repo_root / ".code_review").is_dir(): + patterns.append(".code_review/**/*.md") + if not patterns: + patterns = list(DEFAULT_RULE_PATTERNS) + return patterns + + +def discover_rule_files(repo_root: Path, patterns: Sequence[str]) -> list[str]: + """Discover rule files matching patterns.""" + matches: set[str] = set() + for pattern in patterns: + expanded = glob.glob(str(repo_root / pattern), recursive=True) + for abs_path in expanded: + p = Path(abs_path) + if not p.is_file(): + continue + try: + rel = p.relative_to(repo_root) + except ValueError: + continue + matches.add(str(rel)) + return sorted(matches) + + +def validate_rule_patterns(repo_root: Path, patterns: Sequence[str]) -> tuple[list[str], list[str]]: + """Validate rule patterns, return (valid_patterns, warnings).""" + valid: list[str] = [] + warnings: list[str] = [] + for pattern in patterns: + normalized = str(pattern).strip() + if not normalized: + continue + matches = discover_rule_files(repo_root, [normalized]) + if matches: + valid.append(normalized) + continue + if any(ch in normalized for ch in "*?[]"): + warnings.append(f"rule pattern '{normalized}' matched no files") + else: + warnings.append(f"rule file '{normalized}' not found") + return valid, warnings + + +def default_domain_prompt_template(domain: str) -> str: + """Generate default domain-specific prompt template.""" + return ( + f"Domain focus: {domain}\n" + "Focus areas:\n" + "- domain-specific correctness and policy compliance\n" + "- behavior/regression risks and boundary-condition failures\n" + "- state, contract, lifecycle, or concurrency issues relevant to this domain\n" + "- missing or weak tests for critical domain behavior" + ) + + +def default_pipeline_config() -> dict[str, Any]: + """Return default pipeline configuration.""" + return { + "include_policy_pass": True, + "include_core_passes": True, + "include_domain_passes": True, + "include_depth_passes": True, + "policy_instructions": DEFAULT_POLICY_PASS_INSTRUCTIONS, + "core_passes": json.loads(json.dumps(DEFAULT_CORE_PASS_SPECS)), + "depth_instructions": DEFAULT_DEPTH_PASS_INSTRUCTIONS, + } + + +def build_bootstrap_profile(repo_root: Path) -> dict[str, Any]: + """Build initial profile from repository signals.""" + rule_patterns = infer_rule_patterns(repo_root) + rule_metadata = discover_rule_metadata(repo_root, rule_patterns) + domains = infer_domains_from_rule_metadata(rule_metadata) + by_domain: dict[str, str] = { + d: default_domain_prompt_template(d) for d in domains if d != "core" + } + + return { + "version": 1, + "repo": {"name": infer_repo_name(repo_root)}, + "review": { + "default_base": detect_default_base(repo_root), + "strict_gate": True, + "depth_hotspots": DEFAULT_DEPTH_HOTSPOTS, + "output_root": DEFAULT_OUTPUT_ROOT, + }, + "rules": {"include": rule_patterns}, + "domains": {"default": domains, "allowed": domains}, + "prompts": { + "global": DEFAULT_GLOBAL_PROMPT, + "by_domain": by_domain, + }, + "pipeline": default_pipeline_config(), + } + + +def parse_frontmatter(path: Path) -> dict[str, Any]: + """Parse YAML frontmatter from a file.""" + try: + text = path.read_text(encoding="utf-8", errors="replace") + except OSError: + return {} + + if not text.startswith("---"): + return {} + + match = re.match(r"^---\s*\n(.*?)\n---\s*(?:\n|$)", text, flags=re.DOTALL) + if not match: + return {} + try: + return try_load_yaml(match.group(1)) + except ValueError: + # Rule frontmatter should not fail the entire review bootstrap path. + return {} + + +def _to_boolish(value: Any) -> bool | None: + if isinstance(value, bool): + return value + if isinstance(value, str): + lowered = value.strip().lower() + if lowered in {"true", "1", "yes", "on"}: + return True + if lowered in {"false", "0", "no", "off"}: + return False + return None + + +def _extract_rule_domains(meta: dict[str, Any], rel_path: str) -> list[str]: + domains: list[str] = [] + candidates = [ + meta.get("domain"), + meta.get("domains"), + meta.get("tags"), + meta.get("category"), + meta.get("categories"), + ] + for candidate in candidates: + for item in to_string_list(candidate, []): + normalized = item.strip().lower().replace(" ", "-") + if normalized: + domains.append(normalized) + return unique(domains) + + +def discover_rule_metadata(repo_root: Path, patterns: list[str]) -> list[dict[str, Any]]: + files = discover_rule_files(repo_root, patterns) + rows: list[dict[str, Any]] = [] + for rel in files: + abs_path = repo_root / rel + meta = parse_frontmatter(abs_path) + always_apply = _to_boolish(meta.get("always_apply")) + if always_apply is None: + always_apply = _to_boolish(meta.get("alwaysApply")) + description = str(meta.get("description", "")).strip() + rows.append( + { + "path": rel, + "always_apply": bool(always_apply) if always_apply is not None else False, + "domains": _extract_rule_domains(meta, rel), + "description": description, + } + ) + return rows + + +def infer_domains_from_rule_metadata(rule_metadata: list[dict[str, Any]]) -> list[str]: + domains = {"core"} + for row in rule_metadata: + for domain in to_string_list(row.get("domains"), []): + domains.add(domain) + + result: list[str] = [] + if "core" in domains: + result.append("core") + for domain in sorted(domains): + if domain and domain not in result: + result.append(domain) + return result + + +def sync_profile_with_repo( + raw_profile: dict[str, Any], + repo_root: Path, + *, + prune_autogen: bool, +) -> tuple[dict[str, Any], bool]: + before = stable_json(raw_profile) + profile: dict[str, Any] = json.loads(json.dumps(raw_profile)) + inferred = build_bootstrap_profile(repo_root) + + profile_meta = ensure_dict(profile, "profile_meta") + autogen = ensure_dict(profile_meta, "autogen") + prev_autogen_rules = to_string_list(autogen.get("rules_include"), []) + prev_autogen_domains = to_string_list(autogen.get("domains"), []) + prev_prompt_raw = autogen.get("prompt_by_domain") + prev_autogen_prompt_map: dict[str, str] = {} + if isinstance(prev_prompt_raw, dict): + for key, value in prev_prompt_raw.items(): + k = str(key).strip() + if k: + prev_autogen_prompt_map[k] = str(value) + + repo = ensure_dict(profile, "repo") + if not str(repo.get("name", "")).strip(): + repo["name"] = inferred["repo"]["name"] + + review = ensure_dict(profile, "review") + if not str(review.get("default_base", "")).strip(): + review["default_base"] = inferred["review"]["default_base"] + if "strict_gate" not in review: + review["strict_gate"] = True + if "depth_hotspots" not in review: + review["depth_hotspots"] = DEFAULT_DEPTH_HOTSPOTS + if not str(review.get("output_root", "")).strip(): + review["output_root"] = DEFAULT_OUTPUT_ROOT + + rules = ensure_dict(profile, "rules") + existing_patterns = to_string_list(rules.get("include"), []) + inferred_patterns = to_string_list(inferred["rules"]["include"], []) + if prune_autogen and prev_autogen_rules: + existing_patterns = [p for p in existing_patterns if p not in set(prev_autogen_rules)] + rules["include"] = unique(existing_patterns + inferred_patterns) + + domains = ensure_dict(profile, "domains") + existing_allowed = to_string_list(domains.get("allowed"), []) + existing_default = to_string_list(domains.get("default"), []) + inferred_domains = to_string_list(inferred["domains"]["default"], ["core"]) + if prune_autogen and prev_autogen_domains: + prev_domain_set = set(prev_autogen_domains) + existing_allowed = [d for d in existing_allowed if d not in prev_domain_set] + existing_default = [d for d in existing_default if d not in prev_domain_set] + + merged_allowed = unique(existing_allowed + inferred_domains) + merged_default = unique(existing_default + inferred_domains) + merged_default = [d for d in merged_default if d in set(merged_allowed)] + if not merged_allowed: + merged_allowed = ["core"] + if not merged_default: + merged_default = ["core"] + domains["allowed"] = merged_allowed + domains["default"] = merged_default + + prompts = ensure_dict(profile, "prompts") + if not str(prompts.get("global", "")).strip(): + prompts["global"] = inferred["prompts"]["global"] + + by_domain = prompts.get("by_domain") + if not isinstance(by_domain, dict): + by_domain = {} + + inferred_by_domain = inferred["prompts"]["by_domain"] + new_autogen_prompt_map = dict(prev_autogen_prompt_map) + for domain in merged_allowed: + if domain not in inferred_by_domain: + continue + inferred_prompt = inferred_by_domain[domain] + existing_prompt = str(by_domain.get(domain, "")).strip() + prev_prompt = str(prev_autogen_prompt_map.get(domain, "")).strip() + if not existing_prompt: + by_domain[domain] = inferred_prompt + elif prev_prompt and existing_prompt == prev_prompt and existing_prompt != inferred_prompt: + by_domain[domain] = inferred_prompt + new_autogen_prompt_map[domain] = inferred_prompt + + if prune_autogen: + for domain in list(by_domain.keys()): + if domain in inferred_by_domain: + continue + prev_prompt = str(prev_autogen_prompt_map.get(domain, "")).strip() + current_prompt = str(by_domain.get(domain, "")).strip() + if prev_prompt and current_prompt == prev_prompt: + del by_domain[domain] + new_autogen_prompt_map.pop(domain, None) + + prompts["by_domain"] = by_domain + + pipeline = ensure_dict(profile, "pipeline") + inferred_pipeline = inferred.get("pipeline") + if isinstance(inferred_pipeline, dict): + for key, value in inferred_pipeline.items(): + if key not in pipeline: + pipeline[key] = value + existing_core_passes = pipeline.get("core_passes") + if not isinstance(existing_core_passes, list) or not existing_core_passes: + pipeline["core_passes"] = inferred_pipeline.get("core_passes", []) + + if "version" not in profile: + profile["version"] = 1 + + after_without_meta = stable_json(profile) + changed = before != after_without_meta + + if prune_autogen: + autogen["rules_include"] = inferred_patterns + autogen["domains"] = inferred_domains + autogen["prompt_by_domain"] = { + domain: prompt + for domain, prompt in new_autogen_prompt_map.items() + if domain in inferred_by_domain + } + else: + autogen["rules_include"] = unique(prev_autogen_rules + inferred_patterns) + autogen["domains"] = unique(prev_autogen_domains + inferred_domains) + preserved = dict(prev_autogen_prompt_map) + for domain, prompt in inferred_by_domain.items(): + preserved[domain] = prompt + autogen["prompt_by_domain"] = preserved + + meta = ensure_dict(profile, "profile_meta") + if changed: + meta["managed_by"] = "codexw" + meta["last_synced_utc"] = dt.datetime.now(dt.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + meta["sync_mode"] = "merge+prune" if prune_autogen else "merge" + + final_changed = before != stable_json(profile) + return profile, final_changed diff --git a/codexw/prompts.py b/codexw/prompts.py new file mode 100644 index 0000000..a27da0c --- /dev/null +++ b/codexw/prompts.py @@ -0,0 +1,89 @@ +"""Prompt building for codexw. + +This module handles construction of review prompts for each pass type. +Centralizes prompt logic to make it easier to understand and modify. +""" + +from __future__ import annotations + +from typing import Any + +from .constants import NO_FINDINGS_SENTINEL + + +def build_base_rubric(repo_name: str) -> str: + """Build the base rubric used in all review passes.""" + return ( + f"Act as a strict PR gate reviewer for {repo_name}.\n" + "Return only actionable findings.\n\n" + "Enforcement order:\n" + "- AGENTS.md instructions\n" + "- Domain-specific internal rule files listed below\n" + "- Engineering correctness and risk\n\n" + "For each finding include:\n" + "- Severity: P0, P1, P2, or P3\n" + "- Type: Bug | Regression | Security | Concurrency | TestGap | RuleViolation\n" + "- File path\n" + "- Precise line number or tight line range\n" + "- Violated rule and rule file path (when applicable)\n" + "- Why this is risky\n" + "- Minimal fix direction\n\n" + "Do not output style-only comments unless they violate a required internal rule.\n" + f'If no findings, output exactly: "{NO_FINDINGS_SENTINEL}".' + ) + + +def build_rule_block(rule_files: list[str]) -> str: + """Build the rule enforcement block for prompts.""" + if not rule_files: + return "Required standards files (read and enforce strictly):\n- (none discovered)" + lines = ["Required standards files (read and enforce strictly):"] + lines.extend([f"- {rule}" for rule in rule_files]) + return "\n".join(lines) + + +def build_diff_context( + changed_files: list[str], + modules: list[tuple[int, str]], + hotspots: list[str], +) -> str: + """Build the diff context block for prompts.""" + mod_lines = "\n".join([f"- {m} ({c} files)" for c, m in modules]) or "- (none)" + hot_lines = "\n".join([f"- {h}" for h in hotspots]) or "- (none)" + file_lines = "\n".join([f"- {f}" for f in changed_files]) or "- (none)" + return ( + "Change context for breadth/depth coverage:\n" + f"- Changed files count: {len(changed_files)}\n" + "- Changed modules:\n" + f"{mod_lines}\n" + "- Top hotspots (by changed lines):\n" + f"{hot_lines}\n" + "- Changed files:\n" + f"{file_lines}" + ) + + +def build_domain_prompt(domain: str, profile: dict[str, Any]) -> str: + """Build domain-specific prompt.""" + custom = profile["domain_prompts"].get(domain, "") + base = ( + f"Domain focus: {domain}\n" + f"- identify domain-specific correctness and policy violations for '{domain}'\n" + "- prioritize regressions and production-risk behavior in changed code" + ) + return base + ("\n" + custom if custom else "") + + +def build_pass_prompt( + base_rubric: str, + rules_block: str, + diff_context: str, + global_prompt: str, + extra: str, +) -> str: + """Compose a complete pass prompt from components.""" + parts = [base_rubric, rules_block, diff_context] + if global_prompt: + parts.append("Profile global context:\n" + global_prompt) + parts.append(extra) + return "\n\n".join([p for p in parts if p.strip()]) diff --git a/codexw/reporting.py b/codexw/reporting.py new file mode 100644 index 0000000..716ebab --- /dev/null +++ b/codexw/reporting.py @@ -0,0 +1,179 @@ +"""Report generation for codexw. + +This module handles writing review artifacts: combined reports, +findings JSON, and per-pass outputs. +""" + +from __future__ import annotations + +import datetime as dt +import json +from pathlib import Path +from typing import Any + + +def utc_now() -> dt.datetime: + """Return timezone-aware UTC datetime.""" + return dt.datetime.now(dt.timezone.utc) + + +def write_findings_json( + path: Path, + target_desc: str, + raw_findings: list[dict[str, Any]], +) -> None: + """Write findings to JSON file.""" + path.write_text( + json.dumps( + { + "generated_utc": utc_now().strftime("%Y-%m-%dT%H:%M:%SZ"), + "target": target_desc, + "counts": { + "active": len(raw_findings), + }, + "active_findings": raw_findings, + }, + indent=2, + ) + + "\n", + encoding="utf-8", + ) + + +def write_combined_report( + path: Path, + profile: dict[str, Any], + profile_path: Path, + repo_root: Path, + target_desc: str, + selected_domains: list[str], + rule_files: list[str], + changed_files: list[str], + modules: list[tuple[int, str]], + hotspots: list[str], + depth_hotspots: int, + pass_count: int, + summary_lines: list[str], + raw_findings: list[dict[str, Any]], + findings_json_path: Path, + executed_pass_files: list[Path], + title: str | None = None, + model_override: str | None = None, +) -> None: + """Write the combined markdown report.""" + try: + profile_display = str(profile_path.relative_to(repo_root)) + except ValueError: + profile_display = str(profile_path) + + with path.open("w", encoding="utf-8") as fh: + fh.write("# Codex PR-Grade Multi-Pass Review\n\n") + fh.write(f"- Generated: {utc_now().strftime('%Y-%m-%d %H:%M:%SZ')}\n") + fh.write(f"- Repository context: {profile['repo_name']}\n") + fh.write(f"- Target: {target_desc}\n") + fh.write(f"- Domains: {','.join(selected_domains)}\n") + fh.write(f"- Auto-enforced rule files: {len(rule_files)}\n") + fh.write(f"- Changed files: {len(changed_files)}\n") + fh.write(f"- Depth hotspots: {depth_hotspots}\n") + if title: + fh.write(f"- Title: {title}\n") + if model_override: + fh.write(f"- Model override: {model_override}\n") + fh.write(f"- Pass count: {pass_count}\n") + fh.write(f"- Profile file: {profile_display}\n\n") + + fh.write("## Findings Summary\n\n") + fh.write(f"- Active findings: {len(raw_findings)}\n") + fh.write(f"- JSON artifact: {findings_json_path}\n\n") + + fh.write("## Pass Status\n\n") + fh.write("\n".join(summary_lines) + "\n\n") + + fh.write("## Auto-Enforced Rule Files\n\n") + if rule_files: + fh.write("\n".join(rule_files) + "\n\n") + else: + fh.write("(none discovered)\n\n") + + fh.write("## Changed Modules\n\n") + if modules: + fh.write("\n".join([f"{count}\t{module}" for count, module in modules]) + "\n\n") + else: + fh.write("(none)\n\n") + + fh.write("## Changed Files\n\n") + fh.write("\n".join(changed_files) + "\n\n") + + fh.write("## Hotspots\n\n") + fh.write(("\n".join(hotspots) if hotspots else "(none)") + "\n\n") + + # Append outputs from passes executed in this run only. + for pass_file in executed_pass_files: + fh.write(f"## {pass_file.stem}\n\n") + pass_text = pass_file.read_text(encoding="utf-8") + fh.write(pass_text) + if not pass_text.endswith("\n"): + fh.write("\n") + fh.write("\n") + + +def write_empty_report( + path: Path, + profile: dict[str, Any], + target_desc: str, + selected_domains: list[str], +) -> None: + """Write a report for empty diff case.""" + path.write_text( + "\n".join( + [ + "# Codex PR-Grade Multi-Pass Review", + "", + f"- Generated: {utc_now().strftime('%Y-%m-%d %H:%M:%SZ')}", + f"- Repository context: {profile['repo_name']}", + f"- Target: {target_desc}", + f"- Domains: {','.join(selected_domains)}", + "- Changed files: 0", + "", + "No files detected for selected target.", + ] + ) + + "\n", + encoding="utf-8", + ) + + +def write_support_files( + output_root: Path, + rule_files: list[str], + changed_files: list[str], + modules: list[tuple[int, str]], + hotspots: list[str], + summary_lines: list[str], +) -> None: + """Write supporting text files for artifacts.""" + (output_root / "enforced-rule-files.txt").write_text( + "\n".join(rule_files) + ("\n" if rule_files else ""), + encoding="utf-8", + ) + + (output_root / "changed-files.txt").write_text( + "\n".join(changed_files) + ("\n" if changed_files else ""), + encoding="utf-8", + ) + + (output_root / "changed-modules.txt").write_text( + "\n".join([f"{count}\t{module}" for count, module in modules]) + ("\n" if modules else ""), + encoding="utf-8", + ) + + (output_root / "hotspots.txt").write_text( + "\n".join(hotspots) + ("\n" if hotspots else ""), + encoding="utf-8", + ) + + if summary_lines: + (output_root / "pass-status.md").write_text( + "\n".join(summary_lines) + "\n", + encoding="utf-8", + ) diff --git a/codexw/utils.py b/codexw/utils.py new file mode 100644 index 0000000..ba0cda0 --- /dev/null +++ b/codexw/utils.py @@ -0,0 +1,156 @@ +"""Utility functions for codexw. + +This module contains small, reusable helper functions that don't fit +into a specific domain module. Keeps other modules focused on their +primary responsibility. +""" + +from __future__ import annotations + +import json +import os +import re +import shlex +import subprocess +from pathlib import Path +from typing import Any, Sequence + + +class CodexwError(Exception): + """Base exception for codexw errors.""" + + def __init__(self, message: str, code: int = 1) -> None: + super().__init__(message) + self.code = code + + +def die(message: str, code: int = 1) -> None: + """Print error message and raise CodexwError. + + This function is kept for backward compatibility with existing code. + New code should raise CodexwError directly. + """ + raise CodexwError(message, code) + + +def run_checked(cmd: list[str], cwd: Path) -> str: + """Run a command and return stdout, or die on failure.""" + try: + proc = subprocess.run( + cmd, + cwd=str(cwd), + text=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + check=True, + ) + except subprocess.CalledProcessError as exc: + stderr = exc.stderr.strip() + stdout = exc.stdout.strip() + details = stderr or stdout or "command failed" + die(f"{' '.join(shlex.quote(x) for x in cmd)} :: {details}") + return proc.stdout + + +def run_captured(cmd: list[str], cwd: Path, out_file: Path, *, stream_output: bool) -> int: + """Run a command, capture output to file, optionally stream to stdout.""" + proc = subprocess.run( + cmd, + cwd=str(cwd), + text=True, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + check=False, + ) + output = proc.stdout or "" + out_file.write_text(output, encoding="utf-8") + if stream_output and output: + print(output, end="") + return proc.returncode + + +def shutil_which(name: str) -> str | None: + """Find executable in PATH (minimal shutil.which replacement).""" + paths = os.environ.get("PATH", "").split(os.pathsep) + for directory in paths: + candidate = Path(directory) / name + if candidate.exists() and os.access(candidate, os.X_OK): + return str(candidate) + return None + + +def to_bool(value: Any, default: bool) -> bool: + """Convert value to boolean with default.""" + if value is None: + return default + if isinstance(value, bool): + return value + if isinstance(value, str): + norm = value.strip().lower() + if norm in {"1", "true", "yes", "on"}: + return True + if norm in {"0", "false", "no", "off"}: + return False + return default + + +def to_int(value: Any, default: int) -> int: + """Convert value to non-negative integer with default.""" + if value is None: + return default + try: + parsed = int(value) + return parsed if parsed >= 0 else default + except (TypeError, ValueError): + return default + + +def to_string_list(value: Any, default: Sequence[str] | None = None) -> list[str]: + """Convert value to list of non-empty strings.""" + if value is None: + return list(default or []) + if isinstance(value, list): + return [str(x).strip() for x in value if str(x).strip()] + if isinstance(value, str): + return [x.strip() for x in value.split(",") if x.strip()] + return list(default or []) + + +def to_nonempty_string(value: Any, default: str) -> str: + """Convert value to non-empty string with default.""" + if isinstance(value, str): + text = value.strip() + return text if text else default + return default + + +def unique(values: list[str]) -> list[str]: + """Return list with duplicates removed, preserving order.""" + seen: set[str] = set() + out: list[str] = [] + for v in values: + s = str(v).strip() + if not s or s in seen: + continue + seen.add(s) + out.append(s) + return out + + +def ensure_dict(parent: dict[str, Any], key: str) -> dict[str, Any]: + """Ensure parent[key] is a dict, creating if needed.""" + cur = parent.get(key) + if isinstance(cur, dict): + return cur + parent[key] = {} + return parent[key] + + +def sanitize_pass_id(value: str) -> str: + """Convert string to valid pass ID (alphanumeric, dash, underscore).""" + return re.sub(r"[^a-zA-Z0-9_-]", "-", value.strip()).strip("-") or "pass" + + +def stable_json(obj: Any) -> str: + """Return deterministic JSON string for comparison.""" + return json.dumps(obj, sort_keys=True, separators=(",", ":")) diff --git a/codexw/yaml_fallback.py b/codexw/yaml_fallback.py new file mode 100644 index 0000000..2e0ca01 --- /dev/null +++ b/codexw/yaml_fallback.py @@ -0,0 +1,413 @@ +"""Minimal YAML fallback parser for codexw. + +This module provides basic YAML parsing when PyYAML is not installed. +It supports the subset of YAML features needed for profile files: +- Mappings and lists +- Flow collections ([a, b] and {k: v}) +- Block scalars (| and >) +- Quoted strings (single and double) +- Comments +- Basic scalar types (bool, int, float, null) + +For full YAML support, install PyYAML: pip install pyyaml + +The fallback is intentionally limited to reduce maintenance burden. +Complex YAML should use PyYAML. +""" + +from __future__ import annotations + +import json +import re +from typing import Any + +from .yaml_writer import dump_yaml_text + + +def try_load_yaml(text: str) -> dict[str, Any]: + """Try to load YAML text, using PyYAML if available, fallback otherwise.""" + try: + import yaml # type: ignore + + data = yaml.safe_load(text) + return data if isinstance(data, dict) else {} + except ModuleNotFoundError: + pass + except Exception: + return {} + + # Fallback to simple parser + return parse_simple_yaml(text) + + +def parse_simple_yaml(text: str) -> dict[str, Any]: + """Parse a simple YAML document into a dict.""" + return _SimpleYamlParser(text).parse() + + +# --- Internal implementation --- + + +def _strip_inline_comment(raw: str) -> str: + """Remove inline YAML comment from a line.""" + text = raw.rstrip() + in_single = False + in_double = False + escaped = False + idx = 0 + + while idx < len(text): + ch = text[idx] + if in_double: + if escaped: + escaped = False + elif ch == "\\": + escaped = True + elif ch == '"': + in_double = False + idx += 1 + continue + + if in_single: + if ch == "'": + if idx + 1 < len(text) and text[idx + 1] == "'": + idx += 2 + continue + in_single = False + idx += 1 + continue + + if ch == '"': + in_double = True + elif ch == "'": + in_single = True + elif ch == "#": + prefix = text[:idx].rstrip() + if idx == 0 or text[idx - 1].isspace(): + return text[:idx].rstrip() + # Check if we're after a closed collection or quoted scalar + if _is_closed_flow(prefix) or _is_closed_quoted(prefix): + return text[:idx].rstrip() + idx += 1 + return text + + +def _is_closed_quoted(text: str) -> bool: + """Check if text ends with a closed quoted scalar.""" + stripped = text.strip() + if len(stripped) < 2: + return False + + if stripped[0] == "'" and stripped[-1] == "'": + return True + if stripped[0] == '"' and stripped[-1] == '"': + return True + return False + + +def _is_closed_flow(text: str) -> bool: + """Check if text ends with a closed flow collection.""" + stripped = text.strip() + if len(stripped) < 2: + return False + if stripped[0] == "[" and stripped[-1] == "]": + return True + if stripped[0] == "{" and stripped[-1] == "}": + return True + return False + + +def _parse_scalar(raw: str) -> Any: + """Parse a simple YAML scalar value.""" + token = _strip_inline_comment(raw).strip() + if token == "": + return "" + + lowered = token.lower() + if lowered == "true": + return True + if lowered == "false": + return False + if lowered in {"null", "~"}: + return None + if token == "{}": + return {} + if token == "[]": + return [] + + # Flow list + if token.startswith("[") and token.endswith("]"): + inner = token[1:-1].strip() + if not inner: + return [] + return [_parse_scalar(item) for item in _split_flow_items(inner)] + + # Flow map + if token.startswith("{") and token.endswith("}"): + inner = token[1:-1].strip() + if not inner: + return {} + out: dict[str, Any] = {} + for item in _split_flow_items(inner): + if ":" not in item: + return token + key_raw, value_raw = item.split(":", 1) + key = _parse_scalar(key_raw) + out[str(key)] = _parse_scalar(value_raw) + return out + + # Integer + if re.fullmatch(r"[+-]?\d+", token): + try: + return int(token) + except ValueError: + return token + + # Float + if re.fullmatch(r"[+-]?\d+\.\d+", token): + try: + return float(token) + except ValueError: + return token + + # Double-quoted string + if token.startswith('"') and token.endswith('"'): + try: + return json.loads(token) + except json.JSONDecodeError: + return token[1:-1] + + # Single-quoted string + if token.startswith("'") and token.endswith("'") and len(token) >= 2: + return token[1:-1].replace("''", "'") + + return token + + +def _split_flow_items(raw: str) -> list[str]: + """Split flow collection items, respecting nesting and quotes.""" + items: list[str] = [] + buf: list[str] = [] + in_single = False + in_double = False + escaped = False + depth = 0 + + for ch in raw: + if in_double: + buf.append(ch) + if escaped: + escaped = False + elif ch == "\\": + escaped = True + elif ch == '"': + in_double = False + continue + + if in_single: + buf.append(ch) + if ch == "'": + in_single = False + continue + + if ch == '"': + in_double = True + buf.append(ch) + continue + if ch == "'": + in_single = True + buf.append(ch) + continue + + if ch in "[{(": + depth += 1 + buf.append(ch) + continue + if ch in "]})": + if depth > 0: + depth -= 1 + buf.append(ch) + continue + + if ch == "," and depth == 0: + items.append("".join(buf).strip()) + buf = [] + continue + + buf.append(ch) + + tail = "".join(buf).strip() + if tail: + items.append(tail) + return items + + +class _SimpleYamlParser: + """Simple recursive descent YAML parser.""" + + def __init__(self, text: str) -> None: + self.lines = text.splitlines() + self.index = 0 + + @staticmethod + def _indent(line: str) -> int: + return len(line) - len(line.lstrip(" ")) + + @staticmethod + def _is_ignorable(line: str) -> bool: + stripped = line.strip() + return not stripped or stripped.startswith("#") or stripped in {"---", "..."} + + def _skip_ignorable(self) -> None: + while self.index < len(self.lines) and self._is_ignorable(self.lines[self.index]): + self.index += 1 + + def parse(self) -> dict[str, Any]: + self._skip_ignorable() + if self.index >= len(self.lines): + return {} + start_indent = self._indent(self.lines[self.index]) + value = self._parse_block(start_indent) + self._skip_ignorable() + if self.index < len(self.lines): + raise ValueError(f"unexpected trailing content near line {self.index + 1}") + if not isinstance(value, dict): + raise ValueError("top-level YAML must be a mapping") + return value + + def _parse_block(self, indent: int) -> Any: + self._skip_ignorable() + if self.index >= len(self.lines): + return {} + + cur_indent = self._indent(self.lines[self.index]) + if cur_indent < indent: + return {} + indent = max(cur_indent, indent) + + content = self.lines[self.index][indent:] + if content == "-" or content.startswith("- "): + return self._parse_list(indent) + return self._parse_map(indent) + + def _parse_map(self, indent: int) -> dict[str, Any]: + out: dict[str, Any] = {} + while True: + self._skip_ignorable() + if self.index >= len(self.lines): + break + + line = self.lines[self.index] + cur_indent = self._indent(line) + if cur_indent < indent: + break + if cur_indent > indent: + raise ValueError(f"unexpected indentation at line {self.index + 1}") + + content = line[indent:] + if content == "-" or content.startswith("- "): + break + if ":" not in content: + raise ValueError(f"invalid mapping entry at line {self.index + 1}") + + key, raw_rest = content.split(":", 1) + key = key.strip() + rest = _strip_inline_comment(raw_rest).strip() + self.index += 1 + + if not key: + raise ValueError(f"empty mapping key at line {self.index}") + + if rest in {"|", "|-", ">", ">-"}: + out[key] = self._parse_block_scalar(cur_indent + 2) + elif rest == "": + out[key] = self._parse_nested(cur_indent + 2) + else: + out[key] = _parse_scalar(rest) + + return out + + def _parse_nested(self, expected_indent: int) -> Any: + self._skip_ignorable() + if self.index >= len(self.lines): + return None + + cur_indent = self._indent(self.lines[self.index]) + if cur_indent < expected_indent: + return None + expected_indent = max(cur_indent, expected_indent) + + content = self.lines[self.index][expected_indent:] + if content == "-" or content.startswith("- "): + return self._parse_list(expected_indent) + return self._parse_map(expected_indent) + + def _parse_list(self, indent: int) -> list[Any]: + out: list[Any] = [] + while True: + self._skip_ignorable() + if self.index >= len(self.lines): + break + + line = self.lines[self.index] + cur_indent = self._indent(line) + if cur_indent < indent: + break + if cur_indent > indent: + raise ValueError(f"unexpected indentation at line {self.index + 1}") + + content = line[indent:] + if not (content == "-" or content.startswith("- ")): + break + + rest = "" if content == "-" else _strip_inline_comment(content[2:]).strip() + self.index += 1 + + if rest in {"|", "|-", ">", ">-"}: + out.append(self._parse_block_scalar(indent + 2)) + continue + + if rest == "": + out.append(self._parse_nested(indent + 2)) + continue + + # Check for inline map in list item (- key: value) + inline_match = re.match(r"^([A-Za-z0-9_.-]+):(?:\s+|$)(.*)$", rest) + if inline_match: + key = inline_match.group(1).strip() + tail = _strip_inline_comment(inline_match.group(2)).strip() + item: dict[str, Any] = {} + if tail in {"|", "|-", ">", ">-"}: + item[key] = self._parse_block_scalar(indent + 4) + elif tail == "": + item[key] = self._parse_nested(indent + 4) + else: + item[key] = _parse_scalar(tail) + for extra_key, extra_val in self._parse_map(indent + 2).items(): + item[extra_key] = extra_val + out.append(item) + continue + + out.append(_parse_scalar(rest)) + + return out + + def _parse_block_scalar(self, indent: int) -> str: + lines: list[str] = [] + while self.index < len(self.lines): + raw = self.lines[self.index] + if raw.strip() == "": + lines.append("") + self.index += 1 + continue + + cur_indent = self._indent(raw) + if cur_indent < indent: + break + + lines.append(raw[indent:]) + self.index += 1 + + while lines and lines[-1] == "": + lines.pop() + return "\n".join(lines) diff --git a/codexw/yaml_writer.py b/codexw/yaml_writer.py new file mode 100644 index 0000000..30f4975 --- /dev/null +++ b/codexw/yaml_writer.py @@ -0,0 +1,110 @@ +"""Minimal YAML writer for codexw fallback paths. + +This module emits deterministic YAML for profile files without requiring +PyYAML. It supports the value shapes codexw writes (dict/list/scalars). +""" + +from __future__ import annotations + +import json +import re +from typing import Any + + +def dump_yaml_text(value: Any) -> str: + """Dump a value to YAML text.""" + return "\n".join(_yaml_emit(value)).rstrip() + "\n" + + +def _yaml_plain_scalar_allowed(value: str) -> bool: + """Check if value can be written as plain YAML scalar.""" + if not value or value.strip() != value: + return False + if any(ch in value for ch in ":#{}[]&,*!?|>'\"%@`"): + return False + if value[0] in "-?:!&*@`": + return False + if "\n" in value or "\r" in value or "\t" in value: + return False + lowered = value.lower() + if lowered in {"true", "false", "null", "~", "yes", "no", "on", "off"}: + return False + if re.fullmatch(r"[+-]?\d+(?:\.\d+)?", value): + return False + return True + + +def _yaml_inline_scalar(value: Any) -> str: + """Convert value to inline YAML scalar.""" + if value is None: + return "null" + if isinstance(value, bool): + return "true" if value else "false" + if isinstance(value, (int, float)): + return str(value) + text = str(value) + if _yaml_plain_scalar_allowed(text): + return text + return json.dumps(text) + + +def _yaml_emit(value: Any, indent: int = 0) -> list[str]: + """Emit value as YAML lines.""" + pad = " " * indent + + if isinstance(value, dict): + if not value: + return [pad + "{}"] + lines: list[str] = [] + for key, raw_val in value.items(): + key_text = str(key) + if isinstance(raw_val, str) and "\n" in raw_val: + lines.append(f"{pad}{key_text}: |") + for line in raw_val.splitlines(): + lines.append(" " * (indent + 2) + line) + continue + if isinstance(raw_val, dict): + if raw_val: + lines.append(f"{pad}{key_text}:") + lines.extend(_yaml_emit(raw_val, indent + 2)) + else: + lines.append(f"{pad}{key_text}: {{}}") + continue + if isinstance(raw_val, list): + if raw_val: + lines.append(f"{pad}{key_text}:") + lines.extend(_yaml_emit(raw_val, indent + 2)) + else: + lines.append(f"{pad}{key_text}: []") + continue + lines.append(f"{pad}{key_text}: {_yaml_inline_scalar(raw_val)}") + return lines + + if isinstance(value, list): + if not value: + return [pad + "[]"] + lines = [] + for item in value: + if isinstance(item, str) and "\n" in item: + lines.append(f"{pad}- |") + for line in item.splitlines(): + lines.append(" " * (indent + 2) + line) + continue + if isinstance(item, dict): + if not item: + lines.append(f"{pad}- {{}}") + else: + lines.append(f"{pad}-") + lines.extend(_yaml_emit(item, indent + 2)) + continue + if isinstance(item, list): + if not item: + lines.append(f"{pad}- []") + else: + lines.append(f"{pad}-") + lines.extend(_yaml_emit(item, indent + 2)) + continue + lines.append(f"{pad}- {_yaml_inline_scalar(item)}") + return lines + + return [pad + _yaml_inline_scalar(value)] diff --git a/test/codexw_test.py b/test/codexw_test.py new file mode 100644 index 0000000..7420200 --- /dev/null +++ b/test/codexw_test.py @@ -0,0 +1,827 @@ +#!/usr/bin/env python3 +"""Targeted regression tests for codexw.""" + +from __future__ import annotations + +import os +import pathlib +import shutil +import subprocess +import sys +import tempfile +import unittest +from contextlib import redirect_stderr +from io import StringIO +from unittest import mock + +# Add codexw to path +REPO_ROOT = pathlib.Path(__file__).resolve().parents[1] +sys.path.insert(0, str(REPO_ROOT)) + +from codexw.yaml_fallback import parse_simple_yaml, dump_yaml_text +from codexw.cli import build_parser +from codexw.git import collect_numstat, detect_default_base, resolve_base_ref +from codexw.profile import ( + normalize_profile, + default_domain_prompt_template, + build_bootstrap_profile, + infer_domains_from_rule_metadata, +) +from codexw.reporting import write_combined_report +from codexw.passes import ( + ModelFallbackState, + PassSpec, + RetryStrategy, + build_model_fallback_chain, + extract_configured_effort_from_output, + extract_supported_effort_from_output, + run_review_pass_with_compat, +) +from codexw.utils import CodexwError + + +class CodexwTests(unittest.TestCase): + _SKIP_PRE_COMMIT_INTEGRATION_ENV = "CODEXW_SKIP_PRECOMMIT_INTEGRATION" + + @staticmethod + def _resolve_pre_commit_cmd() -> list[str]: + binary = shutil.which("pre-commit") + if binary: + return [binary] + + probe = subprocess.run( + [sys.executable, "-m", "pre_commit", "--version"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + if probe.returncode == 0: + return [sys.executable, "-m", "pre_commit"] + + raise AssertionError( + "pre-commit is required for integration coverage. " + "Install pre-commit or make the pre_commit module available." + ) + + @staticmethod + def _snapshot_working_tree_to_git_repo(target_repo_root: pathlib.Path) -> str: + tracked = subprocess.run( + ["git", "ls-files", "-z"], + cwd=REPO_ROOT, + check=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=False, + ).stdout.decode("utf-8", errors="replace") + untracked = subprocess.run( + ["git", "ls-files", "--others", "--exclude-standard", "-z"], + cwd=REPO_ROOT, + check=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=False, + ).stdout.decode("utf-8", errors="replace") + + paths = { + p + for p in (tracked + untracked).split("\x00") + if p and not p.endswith("/") + } + + target_repo_root.mkdir(parents=True, exist_ok=True) + for rel in sorted(paths): + src = REPO_ROOT / rel + if not src.is_file(): + continue + dst = target_repo_root / rel + dst.parent.mkdir(parents=True, exist_ok=True) + shutil.copy2(src, dst) + + subprocess.run( + ["git", "init"], + cwd=target_repo_root, + check=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + subprocess.run(["git", "add", "."], cwd=target_repo_root, check=True) + subprocess.run( + [ + "git", + "-c", + "user.name=codexw-test", + "-c", + "user.email=codexw-test@example.com", + "commit", + "-m", + "snapshot", + ], + cwd=target_repo_root, + check=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + return subprocess.run( + ["git", "rev-parse", "HEAD"], + cwd=target_repo_root, + check=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ).stdout.strip() + + def test_flow_list_parses_as_list(self): + parsed = parse_simple_yaml( + """ +domains: + default: [core] + allowed: [core, testing] +""".strip() + ) + + self.assertEqual(parsed["domains"]["default"], ["core"]) + self.assertEqual(parsed["domains"]["allowed"], ["core", "testing"]) + + normalized = normalize_profile(parsed) + self.assertEqual(normalized["default_domains"], ["core"]) + self.assertEqual(normalized["allowed_domains"], ["core", "testing"]) + + def test_inline_comments_do_not_override_values(self): + parsed = parse_simple_yaml( + """ +review: + strict_gate: false # advisory mode + depth_hotspots: 1 # small changes only +domains: + default: [core] + allowed: [core] +""".strip() + ) + + self.assertFalse(parsed["review"]["strict_gate"]) + self.assertEqual(parsed["review"]["depth_hotspots"], 1) + + normalized = normalize_profile(parsed) + self.assertFalse(normalized["strict_gate"]) + self.assertEqual(normalized["depth_hotspots"], 1) + + def test_no_space_comment_after_closed_flow_and_quoted_scalars(self): + parsed = parse_simple_yaml( + """ +domains: + default: [core]# comment + allowed: [core, testing]# comment +repo: + name: 'Repo Name'# comment +prompts: + global: "line"# comment +meta: + link: https://example.com/#fragment +""".strip() + ) + + self.assertEqual(parsed["domains"]["default"], ["core"]) + self.assertEqual(parsed["domains"]["allowed"], ["core", "testing"]) + self.assertEqual(parsed["repo"]["name"], "Repo Name") + self.assertEqual(parsed["prompts"]["global"], "line") + self.assertEqual(parsed["meta"]["link"], "https://example.com/#fragment") + + normalized = normalize_profile(parsed) + self.assertEqual(normalized["default_domains"], ["core"]) + self.assertEqual(normalized["allowed_domains"], ["core", "testing"]) + self.assertEqual(normalized["repo_name"], "Repo Name") + + def test_list_item_with_colon_is_not_forced_to_inline_map(self): + parsed = parse_simple_yaml( + """ +values: + - https://example.com +""".strip() + ) + self.assertEqual(parsed["values"], ["https://example.com"]) + + def test_malformed_mapping_indentation_raises(self): + malformed = """ +rules: + include: + - AGENTS.md + - .cursor/rules/**/*.mdc +""".strip() + with self.assertRaises(ValueError): + parse_simple_yaml(malformed) + + def test_malformed_nested_mapping_indentation_raises(self): + malformed = """ +review: + strict_gate: true + depth_hotspots: 3 +""".strip() + with self.assertRaises(ValueError): + parse_simple_yaml(malformed) + + def test_single_quote_escapes_in_flow_items(self): + parsed = parse_simple_yaml( + """ +values: + - ['it''s,ok', core] +""".strip() + ) + self.assertEqual(parsed["values"], [["it's,ok", "core"]]) + + def test_explicit_nulls_do_not_turn_into_empty_maps(self): + parsed = parse_simple_yaml( + """ +review: + default_base: + output_root: +domains: + default: [core] + allowed: [core] +pipeline: + core_passes: + - id: core-breadth + name: Core breadth + instructions: | + test +""".strip() + ) + + self.assertIsNone(parsed["review"]["default_base"]) + self.assertIsNone(parsed["review"]["output_root"]) + + normalized = normalize_profile(parsed) + self.assertEqual(normalized["default_base"], "main") + self.assertEqual(normalized["output_root"], ".codex/review-runs") + + def test_dump_yaml_round_trips_with_fallback_parser(self): + profile = { + "version": 1, + "repo": {"name": "Repo"}, + "review": {"default_base": "main", "strict_gate": True, "depth_hotspots": 2}, + "domains": {"default": ["core"], "allowed": ["core", "testing"]}, + "prompts": { + "global": "Line 1\nLine 2", + "by_domain": {"testing": "Focus on tests"}, + }, + "pipeline": { + "include_policy_pass": True, + "include_core_passes": True, + "include_domain_passes": True, + "include_depth_passes": True, + "core_passes": [ + { + "id": "core-breadth", + "name": "Core breadth", + "instructions": "Task:\n- cover all files", + } + ], + }, + } + + dumped = dump_yaml_text(profile) + parsed = parse_simple_yaml(dumped) + + self.assertEqual(parsed["domains"]["allowed"], ["core", "testing"]) + self.assertEqual(parsed["review"]["depth_hotspots"], 2) + self.assertEqual(parsed["prompts"]["global"], "Line 1\nLine 2") + + def test_default_domain_prompt_template_is_repo_agnostic(self): + prompt = default_domain_prompt_template("custom-domain") + self.assertIn("Domain focus: custom-domain", prompt) + self.assertIn("domain-specific correctness and policy compliance", prompt) + self.assertNotIn("FakeUsersRepository", prompt) + self.assertNotIn("Duolingo", prompt) + + def test_bootstrap_profile_uses_generic_domain_prompts(self): + with tempfile.TemporaryDirectory() as tmp: + repo_root = pathlib.Path(tmp) + rules_dir = repo_root / ".cursor" / "rules" + rules_dir.mkdir(parents=True, exist_ok=True) + (rules_dir / "testing-rule.mdc").write_text( + """--- +description: Testing conventions +domain: testing +--- +Use testing standards. +""", + encoding="utf-8", + ) + + profile = build_bootstrap_profile(repo_root) + self.assertIn("testing", profile["domains"]["allowed"]) + self.assertEqual( + profile["prompts"]["by_domain"]["testing"], + default_domain_prompt_template("testing"), + ) + + def test_bootstrap_profile_ignores_malformed_rule_frontmatter(self): + with tempfile.TemporaryDirectory() as tmp: + repo_root = pathlib.Path(tmp) + rules_dir = repo_root / ".cursor" / "rules" + rules_dir.mkdir(parents=True, exist_ok=True) + (rules_dir / "broken-rule.mdc").write_text( + """--- +domain: + - testing + - broken +--- +Body +""", + encoding="utf-8", + ) + profile = build_bootstrap_profile(repo_root) + self.assertIn("core", profile["domains"]["allowed"]) + + def test_infer_domains_from_rule_metadata_is_generic(self): + inferred = infer_domains_from_rule_metadata( + [ + {"domains": ["zeta"]}, + {"domains": ["alpha"]}, + ] + ) + self.assertEqual(inferred, ["core", "alpha", "zeta"]) + + def test_script_entrypoint_runs_from_external_cwd(self): + script_path = REPO_ROOT / "codexw" / "__main__.py" + with tempfile.TemporaryDirectory() as tmp: + proc = subprocess.run( + [str(script_path), "review", "--help"], + cwd=tmp, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + self.assertEqual(proc.returncode, 0, msg=proc.stderr) + self.assertIn("usage: codexw review", proc.stdout) + + def test_module_entrypoint_runs_from_repo_root(self): + proc = subprocess.run( + [sys.executable, "-m", "codexw", "review", "--help"], + cwd=str(REPO_ROOT), + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + self.assertEqual(proc.returncode, 0, msg=proc.stderr) + self.assertIn("usage: codexw review", proc.stdout) + + def test_cli_rejects_conflicting_gate_flags(self): + parser = build_parser() + with redirect_stderr(StringIO()): + with self.assertRaises(SystemExit): + parser.parse_args( + ["review", "--fail-on-findings", "--no-fail-on-findings"] + ) + + def test_uncommitted_numstat_includes_untracked_files(self): + with tempfile.TemporaryDirectory() as tmp: + repo_root = pathlib.Path(tmp) + subprocess.run( + ["git", "init"], + cwd=repo_root, + check=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + (repo_root / "tracked.txt").write_text("seed\n", encoding="utf-8") + subprocess.run(["git", "add", "tracked.txt"], cwd=repo_root, check=True) + subprocess.run( + [ + "git", + "-c", + "user.name=codexw-test", + "-c", + "user.email=codexw-test@example.com", + "commit", + "-m", + "seed", + ], + cwd=repo_root, + check=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + + (repo_root / "new_untracked.py").write_text( + "line1\nline2\nline3\n", + encoding="utf-8", + ) + + rows = collect_numstat(repo_root, mode="uncommitted", base="main", commit="") + by_path = {path: delta for delta, path in rows} + self.assertIn("new_untracked.py", by_path) + self.assertEqual(by_path["new_untracked.py"], 3) + + def test_detect_default_base_returns_remote_qualified_ref_when_local_missing(self): + def fake_ref_exists(_repo_root, ref): + return ref == "refs/remotes/origin/main" + + with mock.patch("codexw.git.git_ref_exists", side_effect=fake_ref_exists): + self.assertEqual(detect_default_base(REPO_ROOT), "origin/main") + + def test_resolve_base_ref_prefers_local_branch_over_remote(self): + def fake_ref_exists(_repo_root, ref): + return ref in {"refs/heads/main", "refs/remotes/origin/main"} + + with mock.patch("codexw.git.git_ref_exists", side_effect=fake_ref_exists): + self.assertEqual(resolve_base_ref(REPO_ROOT, "main"), "main") + + def test_resolve_base_ref_maps_to_origin_when_only_remote_exists(self): + def fake_ref_exists(_repo_root, ref): + return ref == "refs/remotes/origin/main" + + with mock.patch("codexw.git.git_ref_exists", side_effect=fake_ref_exists): + self.assertEqual(resolve_base_ref(REPO_ROOT, "main"), "origin/main") + + def test_combined_report_appends_only_executed_pass_files(self): + with tempfile.TemporaryDirectory() as tmp: + repo_root = pathlib.Path(tmp) + output_root = repo_root / "out" + output_root.mkdir(parents=True, exist_ok=True) + profile_path = repo_root / "local-review-profile.yaml" + profile_path.write_text("version: 1\n", encoding="utf-8") + + current_pass = output_root / "pass-1-current.md" + stale_pass = output_root / "pass-2-stale.md" + pass_status = output_root / "pass-status.md" + combined = output_root / "combined-report.md" + findings_json = output_root / "findings.json" + + current_pass.write_text("Current pass output\n", encoding="utf-8") + stale_pass.write_text("Stale pass output\n", encoding="utf-8") + pass_status.write_text("- [PASS] status\n", encoding="utf-8") + + write_combined_report( + path=combined, + profile={"repo_name": "Repo"}, + profile_path=profile_path, + repo_root=repo_root, + target_desc="base branch: main", + selected_domains=["core"], + rule_files=[], + changed_files=["a.py"], + modules=[(1, "a.py")], + hotspots=[], + depth_hotspots=0, + pass_count=1, + summary_lines=["- [PASS] current"], + raw_findings=[], + findings_json_path=findings_json, + executed_pass_files=[current_pass], + ) + + report = combined.read_text(encoding="utf-8") + self.assertIn("## pass-1-current", report) + self.assertNotIn("## pass-2-stale", report) + self.assertNotIn("## pass-status", report) + + def test_pre_commit_hook_runs_codexw_alias_with_print_effective_profile(self): + skip_flag = os.environ.get(self._SKIP_PRE_COMMIT_INTEGRATION_ENV, "").strip().lower() + if skip_flag in {"1", "true", "yes", "on"}: + self.skipTest( + "Skipping pre-commit integration test because " + f"{self._SKIP_PRE_COMMIT_INTEGRATION_ENV}={skip_flag!r}" + ) + + pre_commit_cmd = self._resolve_pre_commit_cmd() + + with tempfile.TemporaryDirectory() as tmp: + tmp_root = pathlib.Path(tmp) + hook_repo_root = tmp_root / "hook-repo" + consumer_root = tmp_root / "consumer-repo" + + repo_rev = self._snapshot_working_tree_to_git_repo(hook_repo_root) + consumer_root.mkdir(parents=True, exist_ok=True) + + subprocess.run( + ["git", "init"], + cwd=consumer_root, + check=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + (consumer_root / "sample.txt").write_text("sample\n", encoding="utf-8") + subprocess.run(["git", "add", "sample.txt"], cwd=consumer_root, check=True) + subprocess.run( + [ + "git", + "-c", + "user.name=codexw-test", + "-c", + "user.email=codexw-test@example.com", + "commit", + "-m", + "seed", + ], + cwd=consumer_root, + check=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + + (consumer_root / ".pre-commit-config.yaml").write_text( + ( + "repos:\n" + f" - repo: {hook_repo_root}\n" + f" rev: {repo_rev}\n" + " hooks:\n" + " - id: codexw\n" + " args:\n" + " - --print-effective-profile\n" + ), + encoding="utf-8", + ) + + proc = subprocess.run( + [ + *pre_commit_cmd, + "run", + "codexw", + "--all-files", + "--hook-stage", + "manual", + ], + cwd=consumer_root, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + + if proc.returncode != 0: + self.fail( + "pre-commit codexw hook failed.\n" + f"stdout:\n{proc.stdout}\n" + f"stderr:\n{proc.stderr}" + ) + + self.assertIn("Codexw (alias)", proc.stdout) + self.assertIn('"effective_profile"', proc.stdout) + self.assertTrue((consumer_root / "local-review-profile.yaml").is_file()) + + def test_recursive_model_fallback_chain(self): + chain = build_model_fallback_chain("gpt-5.3-codex") + self.assertEqual( + chain, + [ + "gpt-5.3-codex", + "gpt-5.2-codex", + "gpt-5.1-codex", + "gpt-5-codex", + "gpt-4.2-codex", + ], + ) + self.assertEqual(len(chain), len(set(chain))) + + def test_pass_retry_falls_back_model_then_effort(self): + pass_spec = PassSpec( + id="pass-1-policy-sweep", + name="Policy pass", + prompt="Run policy review", + ) + model_state = ModelFallbackState() + + with tempfile.TemporaryDirectory() as tmp: + repo_root = pathlib.Path(tmp) + out_file = repo_root / "pass.md" + calls: list[list[str]] = [] + responses = [ + ( + 1, + "error: model_not_found: The model `gpt-5.3-codex` " + "does not exist or you do not have access to it.", + ), + ( + 1, + "error: model_reasoning_effort 'xhigh' is not supported for this model. " + "Supported values: high, medium, low.", + ), + (0, "No actionable findings."), + ] + + def fake_run(cmd, cwd, write_to, stream_output): + _ = cwd, stream_output + calls.append(cmd) + exit_code, text = responses[len(calls) - 1] + write_to.write_text(text, encoding="utf-8") + return exit_code + + with mock.patch("codexw.passes.run_captured", side_effect=fake_run): + run_review_pass_with_compat( + repo_root=repo_root, + out_file=out_file, + target_args=["--uncommitted"], + target_desc="uncommitted changes", + pass_spec=pass_spec, + model_state=model_state, + ) + + self.assertEqual(len(calls), 3) + self.assertNotIn('model="', " ".join(calls[0])) + self.assertIn('model="gpt-5.2-codex"', " ".join(calls[1])) + self.assertIn('model="gpt-5.2-codex"', " ".join(calls[2])) + self.assertIn('model_reasoning_effort="high"', " ".join(calls[2])) + self.assertEqual(model_state.selected_model, "gpt-5.2-codex") + self.assertEqual(model_state.selected_effort, "high") + + def test_pass_retry_reuses_resolved_model_and_effort(self): + pass_spec = PassSpec( + id="pass-2-core-breadth", + name="Core breadth", + prompt="Run core pass", + ) + model_state = ModelFallbackState( + preferred_model="gpt-5.3-codex", + selected_model="gpt-5.2-codex", + selected_effort="high", + ) + + with tempfile.TemporaryDirectory() as tmp: + repo_root = pathlib.Path(tmp) + out_file = repo_root / "pass.md" + calls: list[list[str]] = [] + + def fake_run(cmd, cwd, write_to, stream_output): + _ = cwd, stream_output + calls.append(cmd) + write_to.write_text("No actionable findings.", encoding="utf-8") + return 0 + + with mock.patch("codexw.passes.run_captured", side_effect=fake_run): + run_review_pass_with_compat( + repo_root=repo_root, + out_file=out_file, + target_args=["--base", "main"], + target_desc="base branch: main", + pass_spec=pass_spec, + model_state=model_state, + ) + + self.assertEqual(len(calls), 1) + joined = " ".join(calls[0]) + self.assertIn('model="gpt-5.2-codex"', joined) + self.assertIn('model_reasoning_effort="high"', joined) + + def test_pass_retry_uses_recent_five_model_window(self): + pass_spec = PassSpec( + id="pass-3-policy-sweep", + name="Policy pass recent chain", + prompt="Run policy review", + ) + model_state = ModelFallbackState(preferred_model="gpt-9.9-codex") + + with tempfile.TemporaryDirectory() as tmp: + repo_root = pathlib.Path(tmp) + out_file = repo_root / "pass.md" + calls: list[list[str]] = [] + + fallback_chain = build_model_fallback_chain("gpt-9.9-codex") + final_model = fallback_chain[-1] + + def fake_run(cmd, cwd, write_to, stream_output): + _ = cwd, stream_output + calls.append(cmd) + joined = " ".join(cmd) + if f'model="{final_model}"' in joined: + write_to.write_text("No actionable findings.", encoding="utf-8") + return 0 + + current_model = None + for model in fallback_chain: + if f'model="{model}"' in joined: + current_model = model + break + if current_model is None: + current_model = fallback_chain[0] + + write_to.write_text( + "error: model_not_found: The model " + f"`{current_model}` does not exist or you do not have access to it.", + encoding="utf-8", + ) + return 1 + + with mock.patch("codexw.passes.run_captured", side_effect=fake_run): + run_review_pass_with_compat( + repo_root=repo_root, + out_file=out_file, + target_args=["--uncommitted"], + target_desc="uncommitted changes", + pass_spec=pass_spec, + model_state=model_state, + ) + + # Recency policy limits fallback to latest five candidate models. + self.assertEqual(len(calls), 5) + self.assertIn(f'model="{final_model}"', " ".join(calls[-1])) + self.assertEqual(model_state.selected_model, final_model) + + def test_pass_retry_does_not_slide_beyond_recent_five_window(self): + pass_spec = PassSpec( + id="pass-4-policy-sweep", + name="Policy pass fixed window", + prompt="Run policy review", + ) + model_state = ModelFallbackState(preferred_model="gpt-9.9-codex") + + with tempfile.TemporaryDirectory() as tmp: + repo_root = pathlib.Path(tmp) + out_file = repo_root / "pass.md" + calls: list[list[str]] = [] + expected_chain = build_model_fallback_chain("gpt-9.9-codex") + + def fake_run(cmd, cwd, write_to, stream_output): + _ = cwd, stream_output + calls.append(cmd) + joined = " ".join(cmd) + + current_model = expected_chain[0] + for model in expected_chain: + if f'model="{model}"' in joined: + current_model = model + break + + write_to.write_text( + "error: model_not_found: The model " + f"`{current_model}` does not exist or you do not have access to it.", + encoding="utf-8", + ) + return 1 + + with mock.patch("codexw.passes.run_captured", side_effect=fake_run): + with self.assertRaises(CodexwError): + run_review_pass_with_compat( + repo_root=repo_root, + out_file=out_file, + target_args=["--uncommitted"], + target_desc="uncommitted changes", + pass_spec=pass_spec, + model_state=model_state, + ) + + self.assertEqual(len(calls), len(expected_chain)) + for idx, model in enumerate(expected_chain): + self.assertIn(f'model="{model}"', " ".join(calls[idx])) + self.assertIsNone(model_state.selected_model) + + def test_previous_major_includes_dot_two_variant(self): + chain = build_model_fallback_chain("gpt-4.2-codex") + self.assertEqual( + chain, + [ + "gpt-4.2-codex", + "gpt-4.1-codex", + "gpt-4-codex", + "gpt-3.2-codex", + "gpt-3.1-codex", + ], + ) + + def test_reasoning_effort_unsupported_detects_reasoning_dot_effort_param(self): + output = """ +ERROR: { + "error": { + "message": "Unsupported value: 'xhigh' is not supported with the 'gpt-5-codex' model. Supported values are: 'low', 'medium', and 'high'.", + "type": "invalid_request_error", + "param": "reasoning.effort", + "code": "unsupported_value" + } +} +""".strip() + self.assertTrue(RetryStrategy.reasoning_effort_unsupported(output)) + self.assertEqual(extract_supported_effort_from_output(output), "high") + + def test_extract_configured_effort_from_reasoning_effort_banner(self): + output = """ +model: gpt-5-codex +reasoning effort: xhigh +""".strip() + self.assertEqual(extract_configured_effort_from_output(output), "xhigh") + + def test_model_unavailable_detects_chatgpt_model_not_supported_message(self): + output = ( + "ERROR: {\"detail\":\"The 'gpt-0-codex' model is not supported when using Codex " + "with a ChatGPT account.\"}" + ) + self.assertTrue(RetryStrategy.model_unavailable(output)) + + def test_model_unavailable_does_not_trigger_on_reasoning_effort_error(self): + output = """ +ERROR: { + "error": { + "message": "Unsupported value: 'xhigh' is not supported with the 'gpt-5-codex' model.", + "param": "reasoning.effort", + "code": "unsupported_value" + } +} +""".strip() + self.assertFalse(RetryStrategy.model_unavailable(output)) + + +if __name__ == "__main__": + unittest.main()