diff --git a/tools/README.md b/tools/README.md new file mode 100644 index 0000000..1d34cd5 --- /dev/null +++ b/tools/README.md @@ -0,0 +1,174 @@ +# OGC Building Block Schema Tools + +Standalone Python tools for resolving and validating [OGC Building Block](https://opengeospatial.github.io/bblocks/) schemas. + +## Background + +The `bblocks-postprocess` Docker tool (used by all OGC building block repositories) generates annotated schemas in `build/annotated/`, but these still contain `$ref` references to remote URLs. There is currently no standard way for building block authors to produce a **fully-resolved, self-contained JSON Schema** suitable for local validation, tooling integration, or inspection. + +These tools fill that gap: + +| Tool | Purpose | +|------|---------| +| `resolve_schema.py` | Recursively resolve all `$ref` in a building block schema into a single self-contained JSON Schema | +| `compare_schemas.py` | Compare `schema.yaml` source files against companion JSON schemas for consistency | + +## Prerequisites + +- Python 3.6+ +- [pyyaml](https://pypi.org/project/PyYAML/) (`pip install pyyaml`) + +## resolve_schema.py + +Recursively resolves ALL `$ref` references from modular YAML/JSON source schemas into one fully-inlined JSON Schema. + +### `$ref` patterns handled + +1. **Relative path:** `$ref: ../detailEMPA/schema.yaml` +2. **Fragment-only:** `$ref: '#/$defs/Identifier'` +3. **Cross-file fragment:** `$ref: ../metaMetadata/schema.yaml#/$defs/conformsTo_item` +4. **Both YAML and JSON** file extensions +5. **`bblocks://` URI:** `$ref: bblocks://ogc.geo.features.feature` — cross-building-block references using OGC's `bblocks://` (or `bblocks:`) URI scheme, resolved via the `identifier-prefix` in `bblocks-config.yaml` + +### Usage + +```bash +# Resolve a building block by name (searches _sources/ automatically) +python tools/resolve_schema.py --bblock dataDownload + +# Resolve an arbitrary schema file by path +python tools/resolve_schema.py --file _sources/myFeature/schema.yaml + +# Write to a file instead of stdout +python tools/resolve_schema.py --bblock dataDownload -o resolved.json + +# Flatten allOf entries into merged objects +python tools/resolve_schema.py --file schema.yaml --flatten-allof -o resolved.json + +# Keep metadata keys ($id, x-jsonld-*, etc.) that are stripped by default +python tools/resolve_schema.py --file schema.yaml --keep-metadata + +# Custom _sources directory +python tools/resolve_schema.py --bblock myBlock --sources-dir path/to/_sources +``` + +### Options + +| Option | Description | +|--------|-------------| +| `--file PATH` | Resolve a schema file by path (mutually exclusive with `--bblock`) | +| `--bblock NAME` | Resolve a building block by name (mutually exclusive with `--file`) | +| `--sources-dir PATH` | Path to `_sources/` directory (auto-detected if omitted) | +| `-o, --output PATH` | Write output to file (default: stdout) | +| `--flatten-allof` | Merge `allOf` entries into single objects | +| `--keep-metadata` | Preserve `$id`, `x-jsonld-*`, and other metadata keys | +| `--strip-keys KEY ...` | Custom set of keys to strip (overrides defaults; ignored with `--keep-metadata`) | + +### Building block discovery + +When using `--bblock`, the tool searches for the schema in this order: + +1. `{sources_dir}/{name}/schema.yaml` (flat layout) +2. `{sources_dir}/{name}/schema.json` (flat layout, JSON) +3. `{sources_dir}/**/{name}/schema.yaml` (nested layout, filtered by `bblock.json` presence) +4. `{sources_dir}/**/{name}/schema.json` (nested layout, JSON fallback) + +### `bblocks://` cross-building-block references + +Many OGC building block schemas reference other building blocks using the `bblocks://` URI scheme (e.g., `$ref: bblocks://ogc.geo.common.data_types.bounding_box`). The resolver handles these automatically by: + +1. Reading `bblocks-config.yaml` from the repo root (parent of the sources directory) to get the `identifier-prefix` (e.g., `ogc.`) +2. Scanning all `bblock.json` files to build an index mapping identifiers to schema paths +3. Resolving `bblocks://` refs by looking up the identifier in the index + +This works for all local building blocks within the same repository. References to building blocks from imported registries (external repos) will produce a `$comment` noting they could not be resolved locally. + +Fragment refs are also supported: `$ref: bblocks://ogc.geo.features.feature#/$defs/Something` + +### Example: validate data against resolved schema + +```bash +# Resolve, then validate +python tools/resolve_schema.py --bblock myFeature -o resolved.json +python -c " +import json, jsonschema +schema = json.load(open('resolved.json')) +data = json.load(open('example.json')) +jsonschema.validate(data, schema) +print('Valid!') +" +``` + +## compare_schemas.py + +Compares `schema.yaml` source files against their companion JSON schemas (`{blockName}Schema.json` or `schema.json`) to detect structural inconsistencies. + +### What it checks + +- Missing or extra properties in either file +- Top-level `type` mismatches +- `required` field differences +- Constraint mismatches (enum, const, etc.) +- Description drift (case-insensitive comparison) + +### What it skips + +- `$ref` path differences (YAML uses `.yaml`, JSON uses `.json` or `$defs`) +- `$defs` presence (JSON schemas commonly use `$defs` for ref indirection) + +### Usage + +```bash +# Auto-detect _sources/ directory +python tools/compare_schemas.py + +# Specify _sources/ directory explicitly +python tools/compare_schemas.py --sources-dir _sources + +# Use with a custom path +python tools/compare_schemas.py --sources-dir path/to/my/_sources +``` + +### Options + +| Option | Description | +|--------|-------------| +| `--sources-dir PATH` | Path to `_sources/` directory (auto-detected if omitted) | + +### Building block discovery + +The tool recursively searches for `bblock.json` files under `_sources/`. Each directory containing `bblock.json` is treated as a building block. This works with both: + +- **Flat layouts:** `_sources/myBlock/bblock.json` +- **Nested layouts:** `_sources/category/myBlock/bblock.json` + +### Companion JSON detection + +For each building block directory, the tool looks for the companion JSON schema in this order: + +1. `{blockName}Schema.json` (e.g., `dataDownloadSchema.json`) +2. `{blockName}schema.json` (case variant) +3. `schema.json` (generic name) +4. Case-insensitive fallback + +## Comparison: bblocks-postprocess vs these tools + +| | bblocks-postprocess | resolve_schema.py | +|---|---|---| +| **Runs as** | Docker container | Standalone Python script | +| **Input** | Full repo build | Single schema file or building block name | +| **Output** | `build/annotated/` with remote `$ref` URLs | Fully-resolved, self-contained JSON Schema | +| **Remote refs** | Rewrites to absolute URLs | Resolves to inline definitions | +| **`bblocks://` refs** | Resolved via registry imports | Resolved locally via `bblocks-config.yaml` index | +| **Use case** | CI/CD pipeline, publishing | Local validation, tooling integration, inspection | + +## Installation + +Copy the `tools/` directory into your OGC building block repository: + +```bash +cp -r tools/ /path/to/your-bblock-repo/tools/ +pip install pyyaml +``` + +Or add to your existing `tools/` directory alongside other build scripts. diff --git a/tools/compare_schemas.py b/tools/compare_schemas.py new file mode 100644 index 0000000..3072064 --- /dev/null +++ b/tools/compare_schemas.py @@ -0,0 +1,363 @@ +#!/usr/bin/env python3 +""" +Compare schema.yaml against companion JSON schemas for each OGC building block. + +Reports structural inconsistencies: missing/extra properties, type mismatches, +different constraints (required, enum, const), and description drift. + +$ref paths are expected to differ (YAML -> .yaml, JSON -> .json or $defs) +and are not flagged as errors. + +The script auto-discovers building blocks by recursively searching for +bblock.json files under the _sources directory. It supports both flat layouts +(_sources/myBlock/) and nested layouts (_sources/category/myBlock/). + +Usage: + python tools/compare_schemas.py + python tools/compare_schemas.py --sources-dir _sources + python tools/compare_schemas.py --sources-dir path/to/custom/_sources +""" + +import json +import os +import sys + +try: + import yaml +except ImportError: + print("ERROR: pyyaml required. Install with: pip install pyyaml") + sys.exit(1) + + +# Keys that are expected to differ between YAML and JSON representations +REF_KEYS = {"$ref"} +# Keys where minor wording changes are acceptable +DESCRIPTION_KEYS = {"description"} + + +def find_companion_json(block_dir, block_name): + """Find the companion JSON schema for a building block. + + Looks for: + 1. {blockName}Schema.json (e.g., dataDownloadSchema.json) + 2. {blockName}schema.json (case variant) + 3. schema.json (generic name) + 4. Case-insensitive fallback for {blockName}Schema.json + """ + # Try {blockName}Schema.json / {blockName}schema.json + for pattern in [ + f"{block_name}Schema.json", + f"{block_name}schema.json", + ]: + candidate = os.path.join(block_dir, pattern) + if os.path.exists(candidate): + return candidate + + # Try generic schema.json + generic = os.path.join(block_dir, "schema.json") + if os.path.exists(generic): + return generic + + # Case-insensitive fallback + lower = f"{block_name}schema.json".lower() + try: + for f in os.listdir(block_dir): + if f.lower() == lower: + return os.path.join(block_dir, f) + except OSError: + pass + + return None + + +def find_building_blocks(sources_dir): + """Discover all building blocks under sources_dir. + + Walks the directory tree looking for bblock.json files. Each directory + containing bblock.json is treated as a building block. Works with both + flat layouts (_sources/myBlock/) and nested layouts + (_sources/category/myBlock/). + + Returns a sorted list of (display_name, block_dir, block_name) tuples. + """ + blocks = [] + sources_dir = os.path.abspath(sources_dir) + + for root, dirs, files in os.walk(sources_dir): + if "bblock.json" in files: + block_dir = root + block_name = os.path.basename(block_dir) + # Create a display name showing path relative to sources_dir + rel_path = os.path.relpath(block_dir, sources_dir) + blocks.append((rel_path, block_dir, block_name)) + + return sorted(blocks) + + +def compare_values(yaml_val, json_val, path): + """Compare two schema values, returning a list of differences.""" + diffs = [] + + if isinstance(yaml_val, dict) and isinstance(json_val, dict): + diffs.extend(compare_dicts(yaml_val, json_val, path)) + elif isinstance(yaml_val, list) and isinstance(json_val, list): + diffs.extend(compare_lists(yaml_val, json_val, path)) + elif yaml_val != json_val: + diffs.append(f" {path}: YAML={repr(yaml_val)} vs JSON={repr(json_val)}") + + return diffs + + +def compare_lists(yaml_list, json_list, path): + """Compare two lists element by element.""" + diffs = [] + if len(yaml_list) != len(json_list): + diffs.append( + f" {path}: array length differs: YAML={len(yaml_list)} vs JSON={len(json_list)}" + ) + for i in range(min(len(yaml_list), len(json_list))): + diffs.extend(compare_values(yaml_list[i], json_list[i], f"{path}[{i}]")) + return diffs + + +def compare_dicts(yaml_dict, json_dict, path): + """Compare two dicts, skipping $ref differences and noting structural issues.""" + diffs = [] + + yaml_keys = set(yaml_dict.keys()) + json_keys = set(json_dict.keys()) + + # If one side has a $ref, skip deep comparison (different ref styles expected) + if "$ref" in yaml_keys or "$ref" in json_keys: + # Both have $ref -- that's fine, paths will differ + if "$ref" in yaml_keys and "$ref" in json_keys: + return [] + # One has $ref, other is expanded or uses $defs -- note but don't error + if "$ref" in yaml_keys and "$ref" not in json_keys: + diffs.append(f" {path}: YAML has $ref, JSON has inline definition") + return diffs + if "$ref" not in yaml_keys and "$ref" in json_keys: + diffs.append(f" {path}: JSON has $ref, YAML has inline definition") + return diffs + + # Check for $defs in JSON (expected pattern for ref indirection) + yaml_no_defs = {k: v for k, v in yaml_dict.items() if k != "$defs"} + json_no_defs = {k: v for k, v in json_dict.items() if k != "$defs"} + + yaml_keys_compare = set(yaml_no_defs.keys()) + json_keys_compare = set(json_no_defs.keys()) + + only_yaml = yaml_keys_compare - json_keys_compare + only_json = json_keys_compare - yaml_keys_compare + + if only_yaml: + for k in sorted(only_yaml): + diffs.append(f" {path}: property '{k}' in YAML only") + if only_json: + for k in sorted(only_json): + diffs.append(f" {path}: property '{k}' in JSON only") + + # Compare shared keys + for key in sorted(yaml_keys_compare & json_keys_compare): + child_path = f"{path}.{key}" if path else key + y_val = yaml_dict[key] + j_val = json_dict[key] + + # Skip description wording differences (flag only if one is missing) + if key == "description" and isinstance(y_val, str) and isinstance(j_val, str): + if y_val.strip().lower() != j_val.strip().lower(): + diffs.append( + f" {child_path}: description differs:" + f"\n YAML: {y_val[:80]}" + f"\n JSON: {j_val[:80]}" + ) + continue + + diffs.extend(compare_values(y_val, j_val, child_path)) + + return diffs + + +def check_property_coverage(yaml_schema, json_schema, block_name): + """High-level check: do both schemas define the same top-level properties?""" + issues = [] + + yaml_props = set((yaml_schema.get("properties") or {}).keys()) + json_props = set((json_schema.get("properties") or {}).keys()) + + only_yaml = yaml_props - json_props + only_json = json_props - yaml_props + + if only_yaml: + issues.append(f" Properties in YAML only: {sorted(only_yaml)}") + if only_json: + issues.append(f" Properties in JSON only: {sorted(only_json)}") + + # Check top-level type + if yaml_schema.get("type") != json_schema.get("type"): + issues.append( + f" Top-level type: YAML={yaml_schema.get('type')} vs JSON={json_schema.get('type')}" + ) + + # Check required fields + yaml_req = extract_required(yaml_schema) + json_req = extract_required(json_schema) + if yaml_req != json_req: + only_y = yaml_req - json_req + only_j = json_req - yaml_req + if only_y: + issues.append(f" Required in YAML only: {sorted(only_y)}") + if only_j: + issues.append(f" Required in JSON only: {sorted(only_j)}") + + return issues + + +def extract_required(schema): + """Extract all required field names from a schema, including nested allOf/anyOf.""" + required = set(schema.get("required", [])) + for entry in schema.get("allOf", []): + required.update(entry.get("required", [])) + return required + + +def _detect_sources_dir(): + """Auto-detect the _sources directory relative to the script or CWD.""" + # Relative to script location (tools/ lives next to _sources/) + script_based = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "_sources") + if os.path.isdir(script_based): + return script_based + + # Relative to CWD + cwd_based = os.path.join(os.getcwd(), "_sources") + if os.path.isdir(cwd_based): + return cwd_based + + return script_based # Fall back; will produce clear error later + + +def main(): + import argparse + + parser = argparse.ArgumentParser( + description="Compare schema.yaml against companion JSON schemas for OGC building blocks.", + ) + parser.add_argument( + "--sources-dir", + default=None, + help="Path to the _sources directory (auto-detected if omitted)", + ) + args = parser.parse_args() + + sources_dir = args.sources_dir or _detect_sources_dir() + sources_dir = os.path.abspath(sources_dir) + + if not os.path.isdir(sources_dir): + print(f"ERROR: Sources directory not found: {sources_dir}", file=sys.stderr) + sys.exit(1) + + blocks = find_building_blocks(sources_dir) + if not blocks: + print(f"No building blocks found in {sources_dir}", file=sys.stderr) + sys.exit(1) + + total = 0 + checked = 0 + passed = 0 + failed = 0 + skipped_no_json = 0 + results = [] + + for display_name, block_dir, block_name in blocks: + total += 1 + yaml_path = os.path.join(block_dir, "schema.yaml") + json_path = find_companion_json(block_dir, block_name) + + if not os.path.exists(yaml_path): + results.append((display_name, "SKIP", ["No schema.yaml"])) + continue + + if not json_path: + skipped_no_json += 1 + continue + + checked += 1 + json_filename = os.path.basename(json_path) + + # Load both + try: + with open(yaml_path, "r", encoding="utf-8") as f: + yaml_schema = yaml.safe_load(f) + except Exception as e: + results.append( + (display_name, "ERROR", [f"YAML parse error: {e}"]) + ) + failed += 1 + continue + + try: + with open(json_path, "r", encoding="utf-8") as f: + json_schema = json.load(f) + except Exception as e: + results.append( + ( + display_name, + "ERROR", + [f"JSON parse error ({json_filename}): {e}"], + ) + ) + failed += 1 + continue + + # Compare + issues = [] + issues.extend(check_property_coverage(yaml_schema, json_schema, block_name)) + issues.extend(compare_dicts(yaml_schema, json_schema, "")) + + if issues: + results.append((display_name, "DIFF", issues)) + failed += 1 + else: + results.append((display_name, "OK", [])) + passed += 1 + + # Report + print("=" * 70) + print("OGC Building Block Schema Consistency Report") + print(f" schema.yaml vs companion JSON schema") + print(f" Sources: {sources_dir}") + print("=" * 70) + print( + f"\nTotal blocks: {total} | Checked: {checked} | " + f"Passed: {passed} | Differences: {failed} | " + f"No JSON schema: {skipped_no_json}\n" + ) + + # Show passes + ok_results = [r for r in results if r[1] == "OK"] + if ok_results: + print(f"--- CONSISTENT ({len(ok_results)}) ---") + for name, status, _ in ok_results: + print(f" OK {name}") + print() + + # Show diffs + diff_results = [r for r in results if r[1] in ("DIFF", "ERROR")] + if diff_results: + print(f"--- DIFFERENCES ({len(diff_results)}) ---") + for name, status, issues in diff_results: + print(f"\n {status} {name}") + for issue in issues: + print(f" {issue}") + print() + + if failed: + print(f"\n{failed} building block(s) have inconsistencies.") + return 1 + else: + print("\nAll checked building blocks are consistent.") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/tools/resolve_schema.py b/tools/resolve_schema.py new file mode 100644 index 0000000..403f761 --- /dev/null +++ b/tools/resolve_schema.py @@ -0,0 +1,605 @@ +#!/usr/bin/env python3 +""" +Resolve OGC Building Block schemas into a single complete JSON Schema. + +Recursively resolves ALL $ref references from the modular YAML/JSON source +schemas into one fully-inlined schema -- purely for validation and inspection, +with no form simplifications. + +The OGC bblocks-postprocess Docker tool generates annotated schemas in +build/annotated/, but these still contain $ref references to remote URLs. +This script fills the gap by producing a fully-resolved, self-contained +JSON Schema from the source files in _sources/. + +$ref patterns handled: + 1. Relative path: $ref: ../detailEMPA/schema.yaml + 2. Fragment-only: $ref: '#/$defs/Identifier' + 3. Cross-file fragment: $ref: ../metaMetadata/schema.yaml#/$defs/conformsTo_item + 4. Both YAML and JSON file extensions + 5. bblocks:// URI: $ref: bblocks://ogc.geo.features.feature + (resolved via bblocks-config.yaml identifier-prefix) + +Usage: + python tools/resolve_schema.py --file _sources/myFeature/schema.yaml + python tools/resolve_schema.py --bblock myFeature + python tools/resolve_schema.py --bblock myFeature --sources-dir _sources + python tools/resolve_schema.py --file schema.yaml --flatten-allof -o resolved.json + python tools/resolve_schema.py --file schema.yaml --keep-metadata +""" + +import argparse +import copy +import json +import sys +from pathlib import Path +from typing import Any + +try: + import yaml +except ImportError: + print( + "ERROR: pyyaml is required but not installed.\n" + "Install with: pip install pyyaml", + file=sys.stderr, + ) + sys.exit(1) + +# Keys to strip from schemas by default (metadata, not useful for validation) +DEFAULT_STRIP_KEYS = {"$id", "x-jsonld-prefixes", "x-jsonld-context", "x-jsonld-extra-terms"} + + +# --------------------------------------------------------------------------- +# File loading +# --------------------------------------------------------------------------- + +def load_schema_file(path: Path) -> dict: + """Load a schema file (YAML or JSON) based on extension.""" + with open(path, "r", encoding="utf-8") as f: + if path.suffix in (".yaml", ".yml"): + return yaml.safe_load(f) or {} + else: + return json.load(f) + + +# --------------------------------------------------------------------------- +# JSON Pointer resolution +# --------------------------------------------------------------------------- + +def resolve_fragment(schema: dict, pointer: str) -> Any: + """Resolve a JSON Pointer (e.g., '/$defs/Identifier') within a schema.""" + parts = pointer.lstrip("/").split("/") + current = schema + for part in parts: + if isinstance(current, dict) and part in current: + current = current[part] + elif isinstance(current, list): + current = current[int(part)] + else: + raise KeyError(f"Cannot resolve pointer /{'/'.join(parts)} at part '{part}'") + return current + + +# --------------------------------------------------------------------------- +# Metadata stripping +# --------------------------------------------------------------------------- + +def strip_metadata_keys(schema: Any, strip_keys: set = None, is_root: bool = True) -> Any: + """Recursively remove $id, x-jsonld-*, and nested $schema keys. + + Parameters + ---------- + schema : Any + The schema node to process. + strip_keys : set, optional + Set of top-level key names to strip. Defaults to DEFAULT_STRIP_KEYS. + Keys starting with ``x-jsonld`` are always stripped regardless. + is_root : bool + Whether this is the root schema node (preserves $schema at root). + """ + if strip_keys is None: + strip_keys = DEFAULT_STRIP_KEYS + if isinstance(schema, dict): + result = {} + for k, v in schema.items(): + if k in strip_keys: + continue + if k.startswith("x-jsonld"): + continue + if k == "$schema" and not is_root: + continue + result[k] = strip_metadata_keys(v, strip_keys=strip_keys, is_root=False) + return result + elif isinstance(schema, list): + return [strip_metadata_keys(item, strip_keys=strip_keys, is_root=False) for item in schema] + return schema + + +# --------------------------------------------------------------------------- +# Deep merge (for allOf flattening) +# --------------------------------------------------------------------------- + +_SCHEMA_DEF_KEYS = frozenset({"type", "oneOf", "anyOf", "allOf", "$ref"}) + + +def _is_complete_schema(d: dict) -> bool: + """Return True if d looks like a complete schema definition (has type, composition, or $ref).""" + return bool(d.keys() & _SCHEMA_DEF_KEYS) + + +def deep_merge(base: dict, overlay: dict) -> dict: + """ + Deep merge overlay into base. Overlay values take precedence. + For dicts, merge recursively. For everything else, overlay replaces base. + + Special handling for ``properties`` dicts: when an overlay provides a + property definition that already exists in the base AND the overlay looks + like a complete schema definition (has ``type``, ``oneOf``, ``anyOf``, + ``allOf``, or ``$ref``), the overlay **replaces** the base definition + entirely. This prevents invalid schemas where, e.g., two composed schemas + define the same property with incompatible composition keywords. + + When the overlay is a partial constraint patch (no ``type`` or composition + keywords at the property level -- just nested ``items.properties...``), it is + deep-merged so that the base structure (``type``, ``description``, ``oneOf``, + etc.) is preserved alongside the new constraints. + """ + return _deep_merge_inner(base, overlay, in_properties=False) + + +def _deep_merge_inner(base: dict, overlay: dict, in_properties: bool) -> dict: + result = copy.deepcopy(base) + for k, v in overlay.items(): + if k in result and isinstance(result[k], dict) and isinstance(v, dict): + if in_properties and _is_complete_schema(v): + # Complete property definition -> replace entirely + result[k] = copy.deepcopy(v) + elif k == "properties": + result[k] = _deep_merge_inner(result[k], v, in_properties=True) + else: + result[k] = _deep_merge_inner(result[k], v, in_properties=False) + else: + result[k] = copy.deepcopy(v) + return result + + +# --------------------------------------------------------------------------- +# Core resolution +# --------------------------------------------------------------------------- + +def resolve_file(path: Path, seen: set, bblock_index: dict = None, keep_defs: bool = False) -> dict: + """Load a YAML or JSON schema file and resolve all $ref within it. + + Parameters + ---------- + keep_defs : bool + If True, preserve the resolved ``$defs`` dict in the output. This is + needed when the caller will apply a JSON Pointer fragment that targets + ``$defs`` entries (e.g., ``#/$defs/MultipleOrObjectOrNull``). + """ + canonical = path.resolve() + if canonical in seen: + return {"$comment": f"circular ref to {canonical}"} + seen = seen | {canonical} # Copy to avoid mutation across branches + + schema = load_schema_file(canonical) + if not isinstance(schema, dict): + return schema + + # Resolve $defs so fragment-only refs (#/$defs/X) can find them. + # Two-pass strategy: + # Pass 1 -- resolve every def with an empty local-defs dict. This expands + # all external file $refs but leaves cross-def fragment refs as + # "$comment: unresolved ..." placeholders. + # Pass 2 -- re-resolve every def, this time with the fully-populated defs + # dict so that cross-def fragment refs can be found. + defs = {} + if "$defs" in schema: + raw_defs = schema["$defs"] + for def_name, def_schema in raw_defs.items(): + defs[def_name] = resolve_node(def_schema, canonical.parent, {}, seen, bblock_index) + # Pass 2: re-resolve with full defs. Because pass 1 may have left + # "$comment" placeholders instead of the resolved content, we also + # inline those placeholders by re-walking the defs. + for def_name in list(defs.keys()): + defs[def_name] = _inline_unresolved_defs(defs[def_name], defs, canonical.parent, seen, bblock_index) + + # Walk and resolve the entire schema + resolved = resolve_node(schema, canonical.parent, defs, seen, bblock_index) + + # Remove $defs from final output (they've been inlined), unless the + # caller needs them for fragment resolution. + if isinstance(resolved, dict) and not keep_defs: + resolved.pop("$defs", None) + + return resolved + + +def resolve_node(node: Any, base_dir: Path, defs: dict, seen: set, bblock_index: dict = None) -> Any: + """Recursively resolve $ref in a schema node.""" + if isinstance(node, dict): + if "$ref" in node: + ref = node["$ref"] + resolved = _resolve_ref(ref, base_dir, defs, seen, bblock_index) + + # If $ref has sibling keys, merge resolved with siblings + siblings = {k: v for k, v in node.items() if k != "$ref"} + if siblings: + siblings = resolve_node(siblings, base_dir, defs, seen, bblock_index) + if isinstance(resolved, dict): + resolved = deep_merge(resolved, siblings) + # If resolved is not a dict (unlikely), siblings are lost + return resolved + + # Recurse into all dict values + result = {} + for k, v in node.items(): + result[k] = resolve_node(v, base_dir, defs, seen, bblock_index) + return result + + elif isinstance(node, list): + return [resolve_node(item, base_dir, defs, seen, bblock_index) for item in node] + + return node + + +def _inline_unresolved_defs(node: Any, defs: dict, base_dir: Path, seen: set, bblock_index: dict = None) -> Any: + """ + Walk *node* and replace ``{"$comment": "unresolved fragment ref: #/$defs/X"}`` + placeholders with the actual resolved content from *defs*. + Also re-resolve any remaining $ref nodes with the full defs dict. + """ + if isinstance(node, dict): + # Check for placeholder left by pass 1 + if "$comment" in node and len(node) == 1: + comment = node["$comment"] + if comment.startswith("unresolved fragment ref: #/$defs/"): + def_name = comment.split("/")[-1] + if def_name in defs: + return copy.deepcopy(defs[def_name]) + # Also resolve any leftover $ref + if "$ref" in node: + ref = node["$ref"] + resolved = _resolve_ref(ref, base_dir, defs, seen, bblock_index) + siblings = {k: v for k, v in node.items() if k != "$ref"} + if siblings: + siblings = _inline_unresolved_defs(siblings, defs, base_dir, seen, bblock_index) + if isinstance(resolved, dict): + resolved = deep_merge(resolved, siblings) + return resolved + result = {} + for k, v in node.items(): + result[k] = _inline_unresolved_defs(v, defs, base_dir, seen, bblock_index) + return result + elif isinstance(node, list): + return [_inline_unresolved_defs(item, defs, base_dir, seen, bblock_index) for item in node] + return node + + +def _resolve_ref(ref: str, base_dir: Path, defs: dict, seen: set, bblock_index: dict = None) -> Any: + """Parse and resolve a $ref string.""" + if ref.startswith("#/"): + # Fragment-only ref (e.g., #/$defs/Identifier) + pointer = ref[1:] # Strip leading # + # Try the local defs dict first + parts = pointer.lstrip("/").split("/") + if len(parts) == 2 and parts[0] == "$defs" and parts[1] in defs: + return copy.deepcopy(defs[parts[1]]) + # Fall through: shouldn't happen if $defs were resolved, but handle gracefully + return {"$comment": f"unresolved fragment ref: {ref}"} + + # bblocks:// or bblocks: URI scheme + if ref.startswith("bblocks:"): + if bblock_index: + return _resolve_bblocks_ref(ref, bblock_index, seen) + return {"$comment": f"bblocks ref (no index available): {ref}"} + + # File ref, possibly with fragment + if "#" in ref: + file_part, fragment = ref.split("#", 1) + else: + file_part, fragment = ref, None + + file_path = (base_dir / file_part).resolve() + if not file_path.exists(): + return {"$comment": f"file not found: {file_path}"} + + # When a fragment is present, keep $defs so the pointer can reach them + resolved = resolve_file(file_path, seen, bblock_index, keep_defs=bool(fragment)) + + if fragment: + try: + resolved = resolve_fragment(resolved, fragment) + except KeyError as e: + return {"$comment": f"could not resolve fragment {fragment} in {file_path}: {e}"} + # The fragment result might itself contain refs -- resolve them + resolved = resolve_node(resolved, file_path.parent, {}, seen, bblock_index) + # Strip $defs if the extracted fragment carried them along + if isinstance(resolved, dict): + resolved.pop("$defs", None) + + return resolved + + +# --------------------------------------------------------------------------- +# allOf flattening (optional) +# --------------------------------------------------------------------------- + +def flatten_allof(schema: Any) -> Any: + """ + Recursively flatten allOf entries into a single object. + Merges properties, required, and other constraints from all allOf entries. + Preserves anyOf/oneOf as-is (they represent valid polymorphic choices). + """ + if isinstance(schema, dict): + # Recurse first so nested allOf in properties/items are handled + result = {} + for k, v in schema.items(): + result[k] = flatten_allof(v) + + # Now flatten allOf in the current object + if "allOf" in result: + all_of = result.pop("allOf") + merged = {} + # Collect all non-allOf keys from the current object + for k, v in result.items(): + merged[k] = v + + for entry in all_of: + if isinstance(entry, dict): + merged = deep_merge(merged, entry) + + return merged + + return result + + elif isinstance(schema, list): + return [flatten_allof(item) for item in schema] + + return schema + + +# --------------------------------------------------------------------------- +# Building block discovery +# --------------------------------------------------------------------------- + +def find_bblock_schema(name: str, sources_dir: Path) -> Path: + """Find the schema entry point for a building block by name. + + Search order: + 1. {sources_dir}/{name}/schema.yaml (flat layout) + 2. {sources_dir}/{name}/schema.json (flat layout, JSON) + 3. {sources_dir}/**/{name}/schema.yaml (nested layout, must have bblock.json) + 4. {sources_dir}/**/{name}/schema.json (nested layout, JSON fallback) + """ + # Flat layout + flat_yaml = sources_dir / name / "schema.yaml" + if flat_yaml.exists(): + return flat_yaml + flat_json = sources_dir / name / "schema.json" + if flat_json.exists(): + return flat_json + + # Nested layout -- search recursively for directories matching the name + for bblock_json in sorted(sources_dir.rglob("bblock.json")): + bblock_dir = bblock_json.parent + if bblock_dir.name == name: + yaml_path = bblock_dir / "schema.yaml" + if yaml_path.exists(): + return yaml_path + json_path = bblock_dir / "schema.json" + if json_path.exists(): + return json_path + + print(f"ERROR: Cannot find schema for building block '{name}'", file=sys.stderr) + print(f" Searched in: {sources_dir}", file=sys.stderr) + sys.exit(1) + + +def _detect_sources_dir() -> Path: + """Auto-detect the _sources directory relative to the script or CWD.""" + # Relative to script location (tools/ lives next to _sources/) + script_based = Path(__file__).resolve().parent.parent / "_sources" + if script_based.is_dir(): + return script_based + + # Relative to CWD + cwd_based = Path.cwd() / "_sources" + if cwd_based.is_dir(): + return cwd_based + + return script_based # Fall back; will produce clear error later + + +# --------------------------------------------------------------------------- +# bblocks:// URI resolution +# --------------------------------------------------------------------------- + +def _build_bblock_index(sources_dir: Path) -> dict: + """Build a mapping from bblocks identifier to schema file path. + + Reads ``bblocks-config.yaml`` from the parent of *sources_dir* to obtain + the ``identifier-prefix`` (e.g., ``ogc.``). Then scans for ``bblock.json`` + files and derives each building block's full identifier from its path + relative to *sources_dir*. + + Returns a dict mapping identifier strings to resolved Path objects, e.g.:: + + {"ogc.geo.features.feature": Path(".../geo/features/feature/schema.yaml")} + + Returns an empty dict if no config or no building blocks are found. + """ + # Look for bblocks-config.yaml in the repo root (parent of sources dir) + config_path = sources_dir.parent / "bblocks-config.yaml" + if not config_path.exists(): + return {} + + try: + with open(config_path, "r", encoding="utf-8") as f: + config = yaml.safe_load(f) or {} + except Exception: + return {} + + prefix = config.get("identifier-prefix", "") + + index = {} + for bblock_json in sorted(sources_dir.rglob("bblock.json")): + bblock_dir = bblock_json.parent + # Derive identifier from directory path relative to sources_dir + rel = bblock_dir.relative_to(sources_dir) + # Convert path separators to dots: geo/features/feature -> geo.features.feature + identifier = prefix + ".".join(rel.parts) + + # Find the schema file + schema_yaml = bblock_dir / "schema.yaml" + if schema_yaml.exists(): + index[identifier] = schema_yaml.resolve() + continue + schema_json = bblock_dir / "schema.json" + if schema_json.exists(): + index[identifier] = schema_json.resolve() + + return index + + +def _resolve_bblocks_ref(ref: str, bblock_index: dict, seen: set) -> Any: + """Resolve a ``bblocks://`` or ``bblocks:`` URI reference. + + The ref format is ``bblocks://identifier`` or ``bblocks:identifier``, + optionally followed by ``#/json/pointer``. + """ + # Strip the bblocks:// or bblocks: prefix + if ref.startswith("bblocks://"): + remainder = ref[len("bblocks://"):] + elif ref.startswith("bblocks:"): + remainder = ref[len("bblocks:"):] + else: + return {"$comment": f"not a bblocks ref: {ref}"} + + # Split off any fragment + if "#" in remainder: + identifier, fragment = remainder.split("#", 1) + else: + identifier, fragment = remainder, None + + if identifier not in bblock_index: + return {"$comment": f"bblocks ref not found in local index: {ref}"} + + schema_path = bblock_index[identifier] + # When a fragment is present, keep $defs so the pointer can reach them + resolved = resolve_file(schema_path, seen, bblock_index, keep_defs=bool(fragment)) + + if fragment: + try: + resolved = resolve_fragment(resolved, fragment) + except KeyError as e: + return {"$comment": f"could not resolve fragment {fragment} in {ref}: {e}"} + resolved = resolve_node(resolved, schema_path.parent, {}, seen, bblock_index) + # Strip $defs if the extracted fragment carried them along + if isinstance(resolved, dict): + resolved.pop("$defs", None) + + return resolved + + +# --------------------------------------------------------------------------- +# CLI +# --------------------------------------------------------------------------- + +def main(): + parser = argparse.ArgumentParser( + description="Resolve OGC Building Block schemas into a single complete JSON Schema.", + ) + + input_group = parser.add_mutually_exclusive_group(required=True) + input_group.add_argument( + "--file", + type=Path, + help="Resolve an arbitrary schema file by path", + ) + input_group.add_argument( + "--bblock", + help="Resolve a building block by name (searches --sources-dir)", + ) + + parser.add_argument( + "--sources-dir", + type=Path, + default=None, + help="Path to the _sources directory (auto-detected if omitted)", + ) + parser.add_argument( + "-o", "--output", + type=Path, + help="Write output to file (default: stdout)", + ) + parser.add_argument( + "--flatten-allof", + action="store_true", + help="Merge allOf entries into single objects", + ) + parser.add_argument( + "--keep-metadata", + action="store_true", + help="Keep $id, x-jsonld-*, and other metadata keys (stripped by default)", + ) + parser.add_argument( + "--strip-keys", + nargs="*", + default=None, + help="Custom set of keys to strip (overrides defaults; ignored with --keep-metadata)", + ) + args = parser.parse_args() + + if args.file: + schema_path = args.file.resolve() + if not schema_path.exists(): + print(f"ERROR: File not found: {schema_path}", file=sys.stderr) + sys.exit(1) + # Try to detect sources_dir for bblocks:// resolution even with --file + sources_dir = args.sources_dir + if sources_dir is None: + sources_dir = _detect_sources_dir() + sources_dir = sources_dir.resolve() + else: + sources_dir = args.sources_dir or _detect_sources_dir() + sources_dir = sources_dir.resolve() + if not sources_dir.is_dir(): + print(f"ERROR: Sources directory not found: {sources_dir}", file=sys.stderr) + sys.exit(1) + schema_path = find_bblock_schema(args.bblock, sources_dir) + + # Build bblocks:// index for cross-building-block references + bblock_index = {} + if sources_dir.is_dir(): + bblock_index = _build_bblock_index(sources_dir) + if bblock_index: + print(f"Indexed {len(bblock_index)} building block(s) for bblocks:// resolution", file=sys.stderr) + + print(f"Resolving: {schema_path}", file=sys.stderr) + + # Resolve all $ref recursively + resolved = resolve_file(schema_path, seen=set(), bblock_index=bblock_index) + + # Strip metadata keys (unless --keep-metadata) + if not args.keep_metadata: + strip_keys = set(args.strip_keys) if args.strip_keys is not None else DEFAULT_STRIP_KEYS + resolved = strip_metadata_keys(resolved, strip_keys=strip_keys, is_root=True) + + # Optionally flatten allOf + if args.flatten_allof: + resolved = flatten_allof(resolved) + + # Output + output_json = json.dumps(resolved, indent=2, ensure_ascii=False) + "\n" + + if args.output: + args.output.parent.mkdir(parents=True, exist_ok=True) + with open(args.output, "w", encoding="utf-8") as f: + f.write(output_json) + print(f"Wrote: {args.output}", file=sys.stderr) + else: + sys.stdout.write(output_json) + + +if __name__ == "__main__": + main()