diff --git a/loopstructural/gui/data_conversion/__init__.py b/loopstructural/gui/data_conversion/__init__.py new file mode 100644 index 0000000..6449504 --- /dev/null +++ b/loopstructural/gui/data_conversion/__init__.py @@ -0,0 +1,5 @@ +"""Data conversion GUI components.""" + +from .data_conversion_widget import AutomaticConversionDialog, AutomaticConversionWidget + +__all__ = ["AutomaticConversionDialog", "AutomaticConversionWidget"] diff --git a/loopstructural/gui/data_conversion/configuration.py b/loopstructural/gui/data_conversion/configuration.py new file mode 100644 index 0000000..12ccfef --- /dev/null +++ b/loopstructural/gui/data_conversion/configuration.py @@ -0,0 +1,149 @@ +"""Configuration helpers used by the data conversion UI.""" + +from __future__ import annotations + +from copy import deepcopy +from typing import Any, Dict, Iterable, MutableMapping + + +class Config: + """Container for the default NTGS configuration.""" + + def __init__(self) -> None: + self.fold_config = { + "structtype_column": "FoldType", + "fold_text": "'Anticline','Syncline','Antiform','Synform','Monocline','Monoform','Neutral','Fold axis','Overturned syncline'", + "description_column": "Desc", + "synform_text": "FoldType", + "foldname_column": "FoldName", + "objectid_column": "OBJECTID", + "tightness_column": "IntlimbAng", + "axial_plane_dipdir_column": "AxPlDipDir", + "axial_plane_dip_column": "AxPlDip", + } + + self.fault_config = { + "orientation_type": "dip direction", + "structtype_column": "FaultType", + "fault_text": "'Thrust','Reverse','Normal','Shear zone','Strike-slip','Thrust','Unknown'", + "dip_null_value": "-999", + "dipdir_flag": "num", + "dipdir_column": "DipDir", + "dip_column": "Dip", + "dipestimate_column": "DipEstimate", + "dipestimate_text": "'NORTH_EAST','NORTH',,'NOT ACCESSED'", + "displacement_column": "Displace", + "displacement_text": "'1m-100m', '100m-1km', '1km-5km', '>5km'", + "fault_length_column": "FaultLen", + "fault_length_text": "Small (0-5km),Medium (5-30km),Large (30-100km),Regional (>100km),Unclassified", + "name_column": "FaultName", + "objectid_column": "OBJECTID", + } + + self.geology_config = { + "unitname_column": "Formation", + "alt_unitname_column": "Formation", + "group_column": "Group", + "supergroup_column": "Supergroup", + "description_column": "LithDescn1", + "minage_column": "AgeMin", + "maxage_column": "AgeMax", + "rocktype_column": "LithClass", + "alt_rocktype_column": "RockCat", + "sill_text": "RockCat", + "intrusive_text": "RockCat", + "volcanic_text": "RockCat", + "objectid_column": "OBJECTID", + "ignore_lithology_codes": ["cover", "Unknown"], + } + + self.structure_config = { + "orientation_type": "dip direction", + "dipdir_column": "DipDir", + "dip_column": "Dip", + "description_column": "FeatDesc", + "bedding_text": "ObsType", + "overturned_column": "Desc", + "overturned_text": "overturned", + "objectid_column": "OBJECTID", + } + + self.config_map = { + "geology": self.geology_config, + "structure": self.structure_config, + "fault": self.fault_config, + "fold": self.fold_config, + } + + def __getitem__(self, datatype: str) -> Dict[str, Any]: + return self.config_map[datatype] + + def as_dict(self) -> Dict[str, Dict[str, Any]]: + """Return a deep copy of the configuration map.""" + return deepcopy(self.config_map) + + +def _coerce_config_value(template_value: Any, new_value: Any) -> Any: + """Coerce user supplied values into the template format.""" + if isinstance(template_value, list): + if isinstance(new_value, list): + return new_value + if new_value in (None, ""): + return [] + if isinstance(new_value, str): + return [item.strip() for item in new_value.split(",") if item.strip()] + return [str(new_value)] + + if isinstance(template_value, (int, float)): + try: + return type(template_value)(new_value) + except (TypeError, ValueError): + return template_value + + if template_value is None: + return new_value + + if new_value is None: + return "" + + return str(new_value) + + +class ConfigurationState: + """State holder for the NTGS configuration mapping.""" + + def __init__(self, *, base_config: MutableMapping[str, Dict[str, Any]] | None = None): + self._config = deepcopy(base_config) if base_config is not None else Config().as_dict() + + def data_types(self) -> Iterable[str]: + """Return the supported data types.""" + return self._config.keys() + + def get_config_for_type(self, data_type: str) -> Dict[str, Any]: + """Return a copy of the configuration for a single data type.""" + self._ensure_data_type(data_type) + return deepcopy(self._config[data_type]) + + def set_value(self, data_type: str, key: str, value: Any) -> None: + """Update a single configuration entry.""" + self._ensure_data_type(data_type) + template_value = self._config[data_type].get(key) + self._config[data_type][key] = _coerce_config_value(template_value, value) + + def update_values(self, data_type: str, updates: Dict[str, Any]) -> None: + """Bulk update configuration entries for a type.""" + for key, value in updates.items(): + self.set_value(data_type, key, value) + + def get_value(self, data_type: str, key: str) -> Any: + """Return the stored value for a configuration entry.""" + self._ensure_data_type(data_type) + return self._config[data_type].get(key) + + def as_dict(self) -> Dict[str, Dict[str, Any]]: + """Return a deep copy of the entire configuration map.""" + return deepcopy(self._config) + + def _ensure_data_type(self, data_type: str) -> None: + if data_type not in self._config: + raise KeyError(f"Unknown data type '{data_type}'") diff --git a/loopstructural/gui/data_conversion/data_conversion_widget.py b/loopstructural/gui/data_conversion/data_conversion_widget.py new file mode 100644 index 0000000..59dcbbb --- /dev/null +++ b/loopstructural/gui/data_conversion/data_conversion_widget.py @@ -0,0 +1,778 @@ +"""Data conversion widgets and dialog for LoopStructural.""" + +from __future__ import annotations + +import os +import re +from dataclasses import dataclass +from typing import Any, Dict, Iterable, List, Mapping, Optional, Tuple + +from LoopDataConverter import Datatype, InputData, LoopConverter, SurveyName +from PyQt5.QtCore import Qt, QTimer +from PyQt5.QtWidgets import ( + QComboBox, + QDialog, + QDialogButtonBox, + QFormLayout, + QHBoxLayout, + QLabel, + QPushButton, + QTextEdit, + QVBoxLayout, + QWidget, +) +from qgis.core import QgsMapLayerProxyModel, QgsProject, QgsVectorLayer +from qgis.gui import QgsMapLayerComboBox + +from ...main.helpers import ColumnMatcher +from ...main.vectorLayerWrapper import QgsLayerFromDataFrame, QgsLayerFromGeoDataFrame + +try: + from geopandas import GeoDataFrame +except Exception: # pragma: no cover - geopandas may be unavailable in tests + GeoDataFrame = None + +try: + from pandas import DataFrame +except ImportException: # pragma: no cover - pandas may be unavailable in tests + DataFrame = None + + +def _is_tabular_payload(value: Any) -> bool: + if GeoDataFrame is not None and isinstance(value, GeoDataFrame): + return True + if DataFrame is not None and isinstance(value, DataFrame): + return True + return False + + +def _build_input_data( + formatted_sources: Mapping[str, str], + config_map: Optional[Mapping[str, Mapping[str, Any]]], +) -> Tuple[InputData, bool]: + if config_map is None: + return InputData(**formatted_sources), False + + for key in ("config_map", "config", "configuration"): + try: + return InputData(**formatted_sources, **{key: config_map}), True + except TypeError: + continue + + return InputData(**formatted_sources), False + + +def _build_loop_converter( + survey_name: SurveyName, + input_data: InputData, + config_map: Optional[Mapping[str, Mapping[str, Any]]], +) -> Tuple[LoopConverter, bool]: + if config_map is None: + return LoopConverter(survey_name=survey_name, data=input_data), False + + for key in ("config_map", "config", "configuration"): + try: + return ( + LoopConverter(survey_name=survey_name, data=input_data, **{key: config_map}), + True, + ) + except TypeError: + continue + + converter = LoopConverter(survey_name=survey_name, data=input_data) + for method_name in ("set_config_map", "set_config", "set_configuration"): + method = getattr(converter, method_name, None) + if callable(method): + method(config_map) + return converter, True + + for attr_name in ("config_map", "config", "configuration"): + if hasattr(converter, attr_name): + try: + setattr(converter, attr_name, config_map) + return converter, True + except Exception: + continue + + return converter, False + + +def _run_loop_conversion( + survey_name: SurveyName, + data_sources: Mapping[Datatype | str, str], + *, + config_map: Optional[Mapping[str, Mapping[str, Any]]] = None, +) -> Tuple[Any, Any]: + """Execute the LoopDataConverter workflow for the supplied survey and data.""" + if not data_sources: + raise ValueError("At least one data source is required for conversion.") + + formatted_sources: Dict[str, str] = {} + for data_type, dataset in data_sources.items(): + if not dataset: + continue + key = data_type.name if isinstance(data_type, Datatype) else str(data_type) + formatted_sources[key.split(".")[-1].upper()] = dataset + + if not formatted_sources: + raise ValueError("Unable to run conversion without valid data sources.") + + input_data, input_has_config = _build_input_data(formatted_sources, config_map) + converter, converter_has_config = _build_loop_converter(survey_name, input_data, config_map) + if config_map is not None and not (input_has_config or converter_has_config): + raise ValueError("Conversion configuration could not be applied to the converter.") + conversion_result = converter.convert() + return converter, conversion_result + + +@dataclass +class ConverterOption: + """Simple data container describing an available converter.""" + + identifier: str + label: str + description: str = "" + + def to_dict(self) -> Dict[str, str]: + """Return a serialisable representation of the option.""" + return { + "id": self.identifier, + "label": self.label, + "description": self.description, + } + + +def _normalise_converters(converters: Optional[Iterable[Any]]) -> List[ConverterOption]: + normalised: List[ConverterOption] = [] + if not converters: + return normalised + + if isinstance(converters, (str, bytes)): + converters = [converters] + + for raw in converters: + if isinstance(raw, ConverterOption): + normalised.append(raw) + continue + + if isinstance(raw, Mapping): + identifier = str( + raw.get("id") + or raw.get("identifier") + or raw.get("name") + or raw.get("label") + or "converter" + ) + label = str(raw.get("label") or raw.get("name") or identifier) + description = str(raw.get("description") or "") + normalised.append( + ConverterOption(identifier=identifier, label=label, description=description) + ) + continue + + text = str(raw) + normalised.append(ConverterOption(identifier=text, label=text, description="")) + return normalised + + +class AutomaticConversionWidget(QWidget): + """Widget showing the automatic conversion workflow.""" + + SUPPORTED_DATA_TYPES: Tuple[str, ...] = ("GEOLOGY", "STRUCTURE", "FAULT", "FOLD") + OUTPUT_DATA_TYPES: Tuple[str, ...] = ( + "GEOLOGY", + "STRUCTURE", + "FAULT", + "FOLD", + "FAULT_ORIENTATION", + ) + OUTPUT_GROUP_NAME = "Loop-Ready Data" + + def __init__( + self, + parent: Optional[QWidget] = None, + *, + converters: Optional[Iterable[Any]] = None, + project: Optional[QgsProject] = None, + ): + super().__init__(parent) + self.project = project or QgsProject.instance() + self._options: List[ConverterOption] = [] + self._data_types: List[Datatype | str] = self._discover_data_types() + self.layer_selectors: Dict[Datatype | str, QgsMapLayerComboBox] = {} + layout = QVBoxLayout(self) + layout.setContentsMargins(0, 0, 0, 0) + description = QLabel( + "Automatically run one of the available converters on the selected data sources." + ) + description.setWordWrap(True) + layout.addWidget(description) + + self.converter_combo = QComboBox() + self.converter_combo.setToolTip("Select the converter implementation to run.") + layout.addWidget(self.converter_combo) + + self.summary_text = QTextEdit() + self.summary_text.setReadOnly(True) + self.summary_text.setMinimumHeight(80) + layout.addWidget(self.summary_text) + + source_description = QLabel( + "Select the layers that correspond to each dataset required by the converter." + ) + source_description.setWordWrap(True) + layout.addWidget(source_description) + + self.sources_widget = QWidget() + self.sources_layout = QFormLayout(self.sources_widget) + self.sources_layout.setLabelAlignment(Qt.AlignLeft | Qt.AlignTop) + layout.addWidget(self.sources_widget) + self._build_data_source_inputs() + if self.project is not None: + try: + self.project.layersAdded.connect(self._guess_layers) + except Exception: + # Best-effort: if the project object does not provide a compatible + # layersAdded signal (e.g., in certain QGIS versions or test stubs), + # silently skip automatic layer guessing rather than failing the UI. + pass + + actions_widget = QWidget() + actions_layout = QHBoxLayout(actions_widget) + actions_layout.setContentsMargins(0, 0, 0, 0) + + self.run_button = QPushButton("Run Conversion") + self.run_button.clicked.connect(self._handle_run_conversion) + actions_layout.addWidget(self.run_button) + actions_layout.addStretch() + + self.status_label = QLabel("") + self.status_label.setObjectName("automaticConversionStatus") + actions_layout.addWidget(self.status_label) + + layout.addWidget(actions_widget) + + self.set_converters(converters) + self.converter_combo.currentIndexChanged.connect(self._update_summary_text) + + def set_converters(self, converters: Optional[Iterable[Any]]) -> None: + """Populate the dropdown with the converters supplied by the backend.""" + available = self._default_converter_options() if converters is None else converters + self._options = _normalise_converters(available) + self.converter_combo.clear() + for option in self._options: + self.converter_combo.addItem(option.label, option.identifier) + if not self._options: + self.converter_combo.addItem("No converters available") + self.converter_combo.setEnabled(False) + self.run_button.setEnabled(False) + else: + self.converter_combo.setEnabled(True) + self.run_button.setEnabled(True) + self._update_summary_text() + + def current_converter(self) -> Optional[ConverterOption]: + """Return the active converter option, if any.""" + index = self.converter_combo.currentIndex() + if index < 0 or index >= len(self._options): + return None + return self._options[index] + + def _update_summary_text(self) -> None: + option = self.current_converter() + if option is None: + self.summary_text.setPlainText( + "No converter selected. Please configure converters in the plugin settings." + ) + else: + description = option.description or "No description provided for this converter." + self.summary_text.setPlainText(description) + + def _build_data_source_inputs(self) -> None: + while self.sources_layout.count(): + item = self.sources_layout.takeAt(0) + widget = item.widget() + if widget: + widget.deleteLater() + self.layer_selectors.clear() + + for data_type in self._data_types: + combo = QgsMapLayerComboBox() + if self.project is not None: + try: + combo.setProject(self.project) + except Exception: + # Some QGIS/Qt environments may not support setProject or may raise here; + # failure to bind the project is non-fatal, so we intentionally ignore + pass + combo.setFilters(self._layer_filter_for_data_type(data_type)) + combo.setAllowEmptyLayer(True) + combo.setObjectName(f"automaticSource_{self._format_identifier_label(data_type)}") + self.sources_layout.addRow(self._format_identifier_label(data_type), combo) + self.layer_selectors[data_type] = combo + self._schedule_guess_layers() + + def _schedule_guess_layers(self) -> None: + QTimer.singleShot(0, self._guess_layers) + + def _guess_layers(self) -> None: + """Attempt to auto-select layers based on common naming conventions.""" + if self.project is None and QgsProject.instance() is None: + return + + for data_type, combo in self.layer_selectors.items(): + if combo.currentLayer() is not None: + continue + candidates = self._build_layer_candidate_map(combo) + if not candidates: + continue + matcher = ColumnMatcher(list(candidates.keys())) + match = None + for target in self._target_keys_for_data_type(data_type): + match = matcher.find_match(target) + if match is not None: + break + if match is None: + match = matcher.find_match(self._format_identifier_label(data_type)) + if match is None and isinstance(data_type, Datatype): + value = getattr(data_type, "value", None) + if isinstance(value, str) and value.strip(): + match = matcher.find_match(value) + if match: + layer = candidates.get(match) + if layer is not None: + combo.setLayer(layer) + + def _build_layer_candidate_map(self, combo: QgsMapLayerComboBox) -> Dict[str, QgsVectorLayer]: + candidates: Dict[str, QgsVectorLayer] = {} + for layer in self._layers_for_combo(combo): + if not isinstance(layer, QgsVectorLayer) or not layer.isValid(): + continue + for candidate in self._layer_candidates(layer): + if candidate and candidate not in candidates: + candidates[candidate] = layer + upper_candidate = candidate.upper() if isinstance(candidate, str) else None + if upper_candidate and upper_candidate not in candidates: + candidates[upper_candidate] = layer + return candidates + + def _layers_for_combo(self, combo: QgsMapLayerComboBox) -> List[QgsVectorLayer]: + layers: List[QgsVectorLayer] = [] + for index in range(combo.count()): + layer = combo.layer(index) + if isinstance(layer, QgsVectorLayer): + layers.append(layer) + if layers: + return layers + project = self.project or QgsProject.instance() + if project is None: + return layers + for layer in project.mapLayers().values(): + if isinstance(layer, QgsVectorLayer) and layer.isValid(): + layers.append(layer) + return layers + + def _layer_candidates(self, layer: QgsVectorLayer) -> List[str]: + names: List[str] = [] + if layer is None: + return names + layer_name = layer.name() + if layer_name: + names.append(layer_name) + try: + source = layer.source() + except Exception: + source = "" + names.extend(self._names_from_source(source)) + return names + + def _names_from_source(self, source: str) -> List[str]: + if not source: + return [] + + names: List[str] = [] + parts = source.split("|") + base = parts[0].strip() + if base: + names.append(base) + basename = os.path.basename(base) + if basename: + names.append(basename) + stem, _ = os.path.splitext(basename) + if stem: + names.append(stem) + + for part in parts[1:]: + key, _, value = part.partition("=") + if not value: + continue + key = key.strip().lower() + if key in ("layername", "table"): + cleaned = value.strip().strip("\"'").strip() + if cleaned: + names.append(cleaned) + + for pattern in (r"\blayername\s*=\s*([^|;]+)", r"\btable\s*=\s*([^|;]+)"): + match = re.search(pattern, source, re.IGNORECASE) + if match: + cleaned = match.group(1).strip().strip("\"'").strip() + if cleaned: + names.append(cleaned) + + return names + + def _target_key_for_data_type(self, data_type: Datatype | str) -> str: + if isinstance(data_type, Datatype): + return data_type.name + return str(data_type) + + def _target_keys_for_data_type(self, data_type: Datatype | str) -> List[str]: + key = self._target_key_for_data_type(data_type).upper() + if key == "GEOLOGY": + return ["GEOLOGY", "LITH", "LITHOLOGY", "OUTCROP"] + if key == "FOLD": + return ["FOLD", "FOLDS"] + if key == "FAULT": + return ["FAULT", "FAULTS"] + if key == "STRUCTURE": + return ["STRUCTURE", "STRUCTURES"] + return [key] + + def _layer_filter_for_data_type(self, data_type: Datatype | str) -> int: + key = self._target_key_for_data_type(data_type).upper() + if key == "GEOLOGY": + return QgsMapLayerProxyModel.PolygonLayer + if key == "FAULT": + return QgsMapLayerProxyModel.LineLayer + if key == "STRUCTURE": + return QgsMapLayerProxyModel.PointLayer + if key == "FOLD": + return QgsMapLayerProxyModel.LineLayer + return QgsMapLayerProxyModel.VectorLayer + + def _collect_data_sources(self) -> Dict[Datatype | str, str]: + data_sources: Dict[Datatype | str, str] = {} + for data_type, combo in self.layer_selectors.items(): + layer = combo.currentLayer() + if layer and isinstance(layer, QgsVectorLayer) and layer.isValid(): + path = layer.source() + if path: + data_sources[data_type] = path + return data_sources + + def _handle_run_conversion(self) -> bool: + converter_option = self.current_converter() + if converter_option is None: + self._update_status("Please select a converter before running.", error=True) + return False + + sources = self._collect_data_sources() + if not sources: + self._update_status("Select at least one data source layer before running.", error=True) + return False + + loop_converter: Any = None + result: Any = None + added_layers = 0 + try: + survey = self._normalise_survey_name(converter_option.identifier) + loop_converter, result = self.run_conversion(survey, sources) + layers = self._build_layers_from_converter(loop_converter) + if not layers: + layers = self._materialise_layers_from_result(result) + if layers: + added_layers = self._add_layers_to_project_group(layers) + except Exception as exc: # pragma: no cover - UI feedback + self._update_status(f"Conversion failed: {exc}", error=True) + return False + + if added_layers: + message = f"Conversion completed: {added_layers} layer(s) added to '{self.OUTPUT_GROUP_NAME}'." + elif result not in (None, True): + message = f"Conversion completed: {result}" + else: + message = "Conversion completed successfully." + self._update_status(message) + return True + + def _update_status(self, message: str, *, error: bool = False) -> None: + color = "#c00000" if error else "#006400" + self.status_label.setStyleSheet(f"color: {color};") + self.status_label.setText(message) + + def run_conversion( + self, survey_name: SurveyName | str, data_sources: Mapping[Datatype | str, str] + ) -> Tuple[Any, Any]: + """Execute the LoopConverter against the supplied dataset.""" + survey = self._normalise_survey_name(survey_name) + return _run_loop_conversion(survey, data_sources) + + @staticmethod + def _normalise_survey_name(value: SurveyName | str) -> SurveyName: + """Convert user-provided identifiers into a SurveyName enum.""" + if isinstance(value, SurveyName): + return value + key = str(value).strip() + if "." in key: + key = key.split(".", 1)[1] + try: + return SurveyName[key] + except KeyError: + pass + try: + return SurveyName[key.upper()] + except KeyError as exc: + raise ValueError(f"Unknown survey name '{value}'.") from exc + + def _build_layers_from_converter(self, converter: Any) -> List[QgsVectorLayer]: + if converter is None: + return [] + data_store = getattr(converter, "data", None) + if not data_store: + return [] + + layers: List[QgsVectorLayer] = [] + for identifier in self.OUTPUT_DATA_TYPES: + dtype = self._datatype_for_name(identifier) + dataset = self._extract_dataset(data_store, dtype) + if dataset is None: + continue + layer = self._vector_layer_from_value(dtype, dataset) + if layer is not None and layer.isValid(): + layers.append(layer) + return layers + + def _datatype_for_name(self, identifier: str | Datatype) -> Datatype | str: + if isinstance(identifier, Datatype): + return identifier + try: + return Datatype[str(identifier)] + except Exception: + return str(identifier) + + def _extract_dataset(self, data_store: Any, key: Datatype | str) -> Any: + if data_store is None: + return None + + candidates: List[Any] = [] + if isinstance(key, Datatype): + candidates.append(key) + candidates.append(key.name) + value = getattr(key, "value", None) + if value is not None: + candidates.append(value) + else: + candidates.append(key) + + text_key = str(key) + if "." in text_key: + text_key = text_key.split(".", 1)[-1] + candidates.extend( + [ + text_key, + text_key.upper(), + text_key.lower(), + ] + ) + + for candidate in candidates: + if candidate is None: + continue + if isinstance(data_store, Mapping) and candidate in data_store: + return data_store[candidate] + getter = getattr(data_store, "__getitem__", None) + if callable(getter): + try: + return getter(candidate) + except Exception: + pass + attr_name = str(candidate).lower() + if hasattr(data_store, attr_name): + return getattr(data_store, attr_name) + return None + + def _materialise_layers_from_result(self, result: Any) -> List[QgsVectorLayer]: + layers: List[QgsVectorLayer] = [] + self._collect_layers_from_result(result, layers, prefix="output") + return layers + + def _collect_layers_from_result( + self, payload: Any, layers: List[QgsVectorLayer], *, prefix: str + ) -> None: + if payload is None: + return + if _is_tabular_payload(payload): + layer = self._vector_layer_from_value(prefix, payload) + if layer is not None and layer.isValid(): + layers.append(layer) + return + if isinstance(payload, Mapping): + for key, value in payload.items(): + label = str(key) + self._collect_layers_from_result(value, layers, prefix=label or prefix) + return + if isinstance(payload, (list, tuple, set)): + for index, value in enumerate(payload, start=1): + self._collect_layers_from_result(value, layers, prefix=f"{prefix}_{index}") + return + layer = self._vector_layer_from_value(prefix, payload) + if layer is not None and layer.isValid(): + layers.append(layer) + + def _vector_layer_from_value(self, name: Any, value: Any) -> Optional[QgsVectorLayer]: + label = self._format_identifier_label(str(name)) + if isinstance(value, QgsVectorLayer): + value.setName(label) + return value + if GeoDataFrame is not None and isinstance(value, GeoDataFrame): + try: + return QgsLayerFromGeoDataFrame(value, layer_name=label) + except ValueError: + # fall back to attribute-only table if geometry is missing + return QgsLayerFromDataFrame(value, layer_name=label) + if DataFrame is not None and isinstance(value, DataFrame): + return QgsLayerFromDataFrame(value, layer_name=label) + if ( + DataFrame is not None + and isinstance(value, list) + and value + and all(isinstance(item, Mapping) for item in value) + ): + try: + dataframe = DataFrame(value) + except Exception: + dataframe = None + if dataframe is not None: + return QgsLayerFromDataFrame(dataframe, layer_name=label) + if isinstance(value, str): + layer = QgsVectorLayer(value, label, "ogr") + return layer if layer.isValid() else None + if isinstance(value, Mapping): + provider = str(value.get("provider") or "ogr") + nested_layer = value.get("layer") + if isinstance(nested_layer, QgsVectorLayer): + nested_layer.setName(str(value.get("name") or label)) + return nested_layer if nested_layer.isValid() else None + for key in ("path", "source", "file"): + path = value.get(key) + if isinstance(path, str): + layer_name = str(value.get("name") or label) + layer = QgsVectorLayer(path, layer_name, provider) + return layer if layer.isValid() else None + return None + + def _add_layers_to_project_group(self, layers: Iterable[QgsVectorLayer]) -> int: + project = self.project or QgsProject.instance() + if project is None: + return 0 + root = project.layerTreeRoot() + if root is None: + return 0 + group = root.findGroup(self.OUTPUT_GROUP_NAME) + if group is None: + group = root.insertGroup(0, self.OUTPUT_GROUP_NAME) + added = 0 + for layer in layers: + if project.mapLayer(layer.id()) is None: + project.addMapLayer(layer, False) + group.addLayer(layer) + added += 1 + return added + + def _discover_data_types(self) -> List[Datatype | str]: + members = getattr(Datatype, "__members__", None) + if isinstance(members, Mapping): + selected = [members[name] for name in self.SUPPORTED_DATA_TYPES if name in members] + if selected: + return selected + + try: + enum_values = list(Datatype) + except TypeError: + enum_values = [] + data_types: List[Datatype | str] = [] + for candidate in enum_values: + name = getattr(candidate, "name", str(candidate)).upper() + if name in self.SUPPORTED_DATA_TYPES: + data_types.append(candidate) + if data_types: + return data_types + return list(self.SUPPORTED_DATA_TYPES) + + def _format_identifier_label(self, identifier: Datatype | str) -> str: + member: Optional[Datatype] = identifier if isinstance(identifier, Datatype) else None + text_value = None if isinstance(identifier, Datatype) else str(identifier).strip() + + if member is None and text_value: + # Try exact member name + candidates = getattr(Datatype, "__members__", {}) + key = text_value.split(".", 1)[-1].upper() + member = candidates.get(key) + + if member is None and text_value: + # Try matching the enum value + lowered = text_value.lower() + for enum_member in getattr(Datatype, "__members__", {}).values(): + if str(getattr(enum_member, "value", "")).lower() == lowered: + member = enum_member + break + + if member is not None: + name = member.name + else: + name = text_value or "Data" + if "." in name: + name = name.split(".")[-1] + + parts = name.replace("_", " ").split() + return " ".join(part.capitalize() for part in parts) or "Data" + + def _default_converter_options(self) -> List[ConverterOption]: + members = getattr(SurveyName, "__members__", None) + if not isinstance(members, Mapping): + return [] + options: List[ConverterOption] = [] + for name, survey in members.items(): + label = getattr(survey, "value", None) + if not isinstance(label, str) or not label.strip(): + label = self._format_identifier_label(name) + description = getattr(survey, "description", "") + options.append(ConverterOption(identifier=name, label=label, description=description)) + return options + + +class AutomaticConversionDialog(QDialog): + """Dialog wrapper for the automatic conversion workflow.""" + + def __init__( + self, + parent: Optional[QWidget] = None, + *, + converters: Optional[Iterable[Any]] = None, + project: Optional[QgsProject] = None, + ): + super().__init__(parent) + self.setWindowTitle("Data Converter") + self.project = project or QgsProject.instance() + + layout = QVBoxLayout(self) + description = QLabel("Convert geological datasets for use within LoopStructural.") + description.setWordWrap(True) + layout.addWidget(description) + + self.widget = AutomaticConversionWidget(self, converters=converters, project=self.project) + layout.addWidget(self.widget) + self.widget.run_button.hide() + + self.button_box = QDialogButtonBox(QDialogButtonBox.Ok | QDialogButtonBox.Cancel, self) + self.button_box.accepted.connect(self._run_and_accept) + self.button_box.rejected.connect(self.reject) + layout.addWidget(self.button_box) + + def _run_and_accept(self) -> None: + if self.widget._handle_run_conversion(): + self.accept() + + def set_converters(self, converters: Iterable[Any]) -> None: + """Update the converter options displayed in the dialog.""" + self.widget.set_converters(converters) diff --git a/loopstructural/gui/loop_widget.py b/loopstructural/gui/loop_widget.py index bf78f50..15864dc 100644 --- a/loopstructural/gui/loop_widget.py +++ b/loopstructural/gui/loop_widget.py @@ -6,6 +6,7 @@ """ from PyQt5.QtWidgets import QTabWidget, QVBoxLayout, QWidget + from .modelling.modelling_widget import ModellingWidget from .visualisation.visualisation_widget import VisualisationWidget diff --git a/loopstructural/gui/map2loop_tools/thickness_calculator_widget.py b/loopstructural/gui/map2loop_tools/thickness_calculator_widget.py index 5fb25fb..c2aa8cb 100644 --- a/loopstructural/gui/map2loop_tools/thickness_calculator_widget.py +++ b/loopstructural/gui/map2loop_tools/thickness_calculator_widget.py @@ -1,8 +1,8 @@ """Widget for thickness calculator.""" import os -import pandas as pd +import pandas as pd from PyQt5.QtWidgets import QMessageBox, QWidget from qgis.core import QgsMapLayerProxyModel from qgis.PyQt import uic @@ -45,9 +45,10 @@ def __init__(self, parent=None, data_manager=None, debug_manager=None): self.basalContactsComboBox.setFilters(QgsMapLayerProxyModel.LineLayer) self.sampledContactsComboBox.setFilters(QgsMapLayerProxyModel.PointLayer) self.structureLayerComboBox.setFilters(QgsMapLayerProxyModel.PointLayer) + self.crossSectionLayerComboBox.setFilters(QgsMapLayerProxyModel.LineLayer) # Initialize calculator types - self.calculator_types = ["InterpolatedStructure", "StructuralPoint"] + self.calculator_types = ["InterpolatedStructure", "StructuralPoint", "AlongSection"] self.calculatorTypeComboBox.addItems(self.calculator_types) # Initialize orientation types @@ -151,6 +152,14 @@ def _guess_layers(self): structure_layer = self.data_manager.find_layer_by_name(structure_layer_match) self.structureLayerComboBox.setLayer(structure_layer) + # Attempt to find cross-sections layer + cross_sections_layer_names = get_layer_names(self.crossSectionLayerComboBox) + cross_sections_matcher = ColumnMatcher(cross_sections_layer_names) + cross_sections_layer_match = cross_sections_matcher.find_match('CROSS_SECTIONS') + if cross_sections_layer_match: + cross_sections_layer = self.data_manager.find_layer_by_name(cross_sections_layer_match) + self.crossSectionLayerComboBox.setLayer(cross_sections_layer) + def _setup_field_combo_boxes(self): """Set up field combo boxes to link to their respective layers.""" self.unitNameFieldComboBox.setLayer(self.geologyLayerComboBox.currentLayer()) @@ -210,7 +219,18 @@ def _on_calculator_type_changed(self): if calculator_type == "StructuralPoint": self.maxLineLengthLabel.setVisible(True) self.maxLineLengthSpinBox.setVisible(True) - else: # InterpolatedStructure + self.crossSectionLayerLabel.setVisible(False) + self.crossSectionLayerComboBox.setVisible(False) + + elif calculator_type == "InterpolatedStructure": + self.maxLineLengthLabel.setVisible(False) + self.maxLineLengthSpinBox.setVisible(False) + self.crossSectionLayerLabel.setVisible(False) + self.crossSectionLayerComboBox.setVisible(False) + + elif calculator_type == "AlongSection": + self.crossSectionLayerLabel.setVisible(True) + self.crossSectionLayerComboBox.setVisible(True) self.maxLineLengthLabel.setVisible(False) self.maxLineLengthSpinBox.setVisible(False) @@ -227,6 +247,7 @@ def _restore_selection(self): ('basal_contacts_layer', self.basalContactsComboBox), ('sampled_contacts_layer', self.sampledContactsComboBox), ('structure_layer', self.structureLayerComboBox), + ("cross_sections_layer", self.crossSectionLayerComboBox), ): if layer_name := settings.get(key): layer = self.data_manager.find_layer_by_name(layer_name) @@ -277,6 +298,11 @@ def _persist_selection(self): if self.structureLayerComboBox.currentLayer() else None ), + 'cross_sections_layer': ( + self.crossSectionLayerComboBox.currentLayer().name() + if self.crossSectionLayerComboBox.currentLayer() + else None + ), 'calculator_type_index': self.calculatorTypeComboBox.currentIndex(), 'orientation_type_index': self.orientationTypeComboBox.currentIndex(), 'max_line_length': self.maxLineLengthSpinBox.value(), @@ -310,6 +336,11 @@ def _run_calculator(self): QMessageBox.warning(self, "Missing Input", "Please select a structure layer.") return False + elif calculator_type == "AlongSection": + if not self.crossSectionLayerComboBox.currentLayer(): + QMessageBox.warning(self, "Missing Input", "Please select a cross-sections layer.") + return False + # Prepare parameters params = self.get_parameters() @@ -430,6 +461,7 @@ def get_parameters(self): 'basal_contacts': self.basalContactsComboBox.currentLayer(), 'sampled_contacts': self.sampledContactsComboBox.currentLayer(), 'structure': self.structureLayerComboBox.currentLayer(), + 'cross_sections': self.crossSectionLayerComboBox.currentLayer(), 'orientation_type': self.orientationTypeComboBox.currentText(), 'unit_name_field': self.unitNameFieldComboBox.currentField(), 'dip_field': self.dipFieldComboBox.currentField(), diff --git a/loopstructural/gui/map2loop_tools/thickness_calculator_widget.ui b/loopstructural/gui/map2loop_tools/thickness_calculator_widget.ui index 63d978e..405b124 100644 --- a/loopstructural/gui/map2loop_tools/thickness_calculator_widget.ui +++ b/loopstructural/gui/map2loop_tools/thickness_calculator_widget.ui @@ -161,6 +161,20 @@ + + + + Cross-Sections Layer: + + + + + + + true + + + diff --git a/loopstructural/main/m2l_api.py b/loopstructural/main/m2l_api.py index 2b80938..f1fee23 100644 --- a/loopstructural/main/m2l_api.py +++ b/loopstructural/main/m2l_api.py @@ -8,7 +8,7 @@ SorterObservationProjections, SorterUseNetworkX, ) -from map2loop.thickness_calculator import InterpolatedStructure, StructuralPoint +from map2loop.thickness_calculator import AlongSection, InterpolatedStructure, StructuralPoint from osgeo import gdal from qgis.core import QgsVectorLayer @@ -394,6 +394,7 @@ def calculate_thickness( basal_contacts, sampled_contacts, structure, + cross_sections=None, calculator_type="InterpolatedStructure", dtm=None, unit_name_field="UNITNAME", @@ -418,6 +419,8 @@ def calculate_thickness( Sampled contacts point layer. structure : QgsVectorLayer or GeoDataFrame Structure point layer with orientation data. + cross_sections : QgsVectorLayer or GeoDataFrame, optional + Cross-sections line layer, by default None. calculator_type : str, optional Type of calculator ("InterpolatedStructure" or "StructuralPoint"), by default "InterpolatedStructure". dtm : QgsRasterLayer or GDAL dataset, optional @@ -455,6 +458,7 @@ def calculate_thickness( ) sampled_contacts_gdf = qgsLayerToGeoDataFrame(sampled_contacts) structure_gdf = qgsLayerToGeoDataFrame(structure) + cross_sections_gdf = qgsLayerToGeoDataFrame(cross_sections) # Log parameters via DebugManager if provided if debug_manager: @@ -470,6 +474,7 @@ def calculate_thickness( "basal_contacts": basal_contacts_gdf, "sampled_contacts": sampled_contacts_gdf, "structure": structure_gdf, + "cross_sections": cross_sections_gdf, }, ) @@ -479,6 +484,7 @@ def calculate_thickness( 'maxy': geology_gdf.total_bounds[3], 'miny': geology_gdf.total_bounds[1], } + # Rename unit name field if needed if unit_name_field and unit_name_field != 'UNITNAME': if unit_name_field in geology_gdf.columns: @@ -513,7 +519,7 @@ def calculate_thickness( is_strike=orientation_type == 'Strike', max_line_length=max_line_length, ) - else: # StructuralPoint + if calculator_type == "StructuralPoint": if max_line_length is None: raise ValueError("max_line_length parameter is required for StructuralPoint calculator") calculator = StructuralPoint( @@ -522,6 +528,12 @@ def calculate_thickness( is_strike=orientation_type == 'Strike', max_line_length=max_line_length, ) + if calculator_type == "AlongSection": + calculator = AlongSection( + bounding_box=bounding_box, + sections=cross_sections_gdf, + ) + if unit_name_field != 'UNITNAME' and unit_name_field in geology_gdf.columns: geology_gdf = geology_gdf.rename(columns={unit_name_field: 'UNITNAME'}) units = geology_gdf.copy() diff --git a/loopstructural/main/vectorLayerWrapper.py b/loopstructural/main/vectorLayerWrapper.py index f744d0c..2798b87 100644 --- a/loopstructural/main/vectorLayerWrapper.py +++ b/loopstructural/main/vectorLayerWrapper.py @@ -24,6 +24,7 @@ QgsProject, QgsRaster, QgsRasterLayer, + QgsVectorLayer, QgsWkbTypes, ) from qgis.PyQt.QtCore import QDateTime, QVariant @@ -580,6 +581,204 @@ def _fields_from_dataframe(df, drop_cols=None) -> QgsFields: return fields +def _geometry_from_value(value): + """Convert shapely/QGIS geometry objects into QgsGeometry instances.""" + if value is None: + return None + if isinstance(value, QgsGeometry): + return QgsGeometry(value) + # QgsGeometry with asWkb + for attr in ("asWkb", "exportToWkb"): + method = getattr(value, attr, None) + if callable(method): + try: + data = method() + except Exception: + data = None + if data: + try: + data = bytes(data) + except Exception: + # Best-effort conversion to bytes; if this fails, leave data as-is + + pass + try: + return QgsGeometry.fromWkb(data) + except Exception: + # Best-effort conversion to bytes; if this fails, fall back to using the + # original data and let the subsequent fromWkb call handle it. + logger.debug( + "Failed to convert WKB data to bytes in _geometry_from_value", + exc_info=True, + ) + continue + # Shapely geometries expose wkb/wkt attributes + wkb_data = getattr(value, "wkb", None) + if wkb_data is not None: + try: + return QgsGeometry.fromWkb(bytes(wkb_data)) + except Exception: + pass + wkt_data = getattr(value, "wkt", None) + if wkt_data: + try: + return QgsGeometry.fromWkt(str(wkt_data)) + except Exception: + pass + return None + + +def _infer_wkb_type_from_geoms(geoms) -> QgsWkbTypes.Type: + """Infer a WKB type from a GeoSeries or iterable of geometries.""" + for geom in geoms: + qgs_geom = _geometry_from_value(geom) + if qgs_geom is not None and not qgs_geom.isEmpty(): + return qgs_geom.wkbType() + return QgsWkbTypes.Point + + +def _crs_from_geodataframe_crs(crs_info) -> QgsCoordinateReferenceSystem: + """Best-effort conversion of GeoPandas CRS metadata to QgsCoordinateReferenceSystem.""" + crs = QgsCoordinateReferenceSystem() + if crs_info is None: + return crs + # pyproj CRS exposes helpers like to_wkt/to_epsg + text = None + for attr in ("to_wkt", "to_string"): + method = getattr(crs_info, attr, None) + if callable(method): + try: + text = method() + except Exception: + text = None + if text: + break + if text: + try: + return QgsCoordinateReferenceSystem.fromWkt(text) + except Exception: + temp = QgsCoordinateReferenceSystem() + if hasattr(temp, "createFromWkt"): + try: + if temp.createFromWkt(text): + return temp + except Exception: + pass + try: + epsg = crs_info.to_epsg() + if epsg: + return QgsCoordinateReferenceSystem.fromEpsgId(int(epsg)) + except Exception: + logger.debug("Failed to convert EPSG code to QgsCoordinateReferenceSystem", exc_info=True) + pass + if isinstance(crs_info, str): + try: + temp = QgsCoordinateReferenceSystem(crs_info) + if temp.isValid(): + return temp + except Exception: + pass + return crs + + +def QgsLayerFromGeoDataFrame(geodataframe, layer_name: str = "Converted Data"): + """Create an in-memory QgsVectorLayer from a GeoPandas GeoDataFrame.""" + if geodataframe is None: + return None + geometry_series = getattr(geodataframe, "geometry", None) + if geometry_series is None: + raise ValueError("GeoDataFrame must include a geometry column.") + geometry_column = geometry_series.name + wkb_type = _infer_wkb_type_from_geoms(geometry_series) + geom_name = QgsWkbTypes.displayString(wkb_type) or "Point" + crs = _crs_from_geodataframe_crs(getattr(geodataframe, "crs", None)) + uri = geom_name + if crs.isValid(): + authid = crs.authid() + if authid: + uri = f"{geom_name}?crs={authid}" + layer = QgsVectorLayer(uri, layer_name, "memory") + if crs.isValid(): + layer.setCrs(crs) + provider = layer.dataProvider() + attribute_fields = [] + for column in geodataframe.columns: + if column == geometry_column: + continue + attribute_fields.append( + QgsField(str(column), _qvariant_type_from_dtype(geodataframe[column].dtype)) + ) + if attribute_fields: + provider.addAttributes(attribute_fields) + layer.updateFields() + non_geom_cols = [col for col in geodataframe.columns if col != geometry_column] + features = [] + for _, row in geodataframe.iterrows(): + feat = QgsFeature(layer.fields()) + attrs = [] + for column in non_geom_cols: + val = row[column] + if isinstance(val, np.generic): + try: + val = val.item() + except Exception: + pass + if pd.isna(val): + val = None + attrs.append(val) + feat.setAttributes(attrs) + geom = _geometry_from_value(row[geometry_column]) + if geom is not None and not geom.isEmpty(): + feat.setGeometry(geom) + features.append(feat) + if features: + provider.addFeatures(features) + layer.updateExtents() + return layer + + +def QgsLayerFromDataFrame(dataframe, layer_name: str = "Converted Table"): + """Create an attribute-only memory layer from a pandas-compatible DataFrame.""" + if dataframe is None: + return None + df = dataframe.copy() + geometry_series = getattr(df, "geometry", None) + geometry_name = getattr(geometry_series, "name", None) + if geometry_name and geometry_name in df.columns: + df = df.drop(columns=[geometry_name]) + + layer = QgsVectorLayer("None", layer_name, "memory") + provider = layer.dataProvider() + + attributes = [] + for column in df.columns: + attributes.append(QgsField(str(column), _qvariant_type_from_dtype(df[column].dtype))) + if attributes: + provider.addAttributes(attributes) + layer.updateFields() + + features = [] + for _, row in df.iterrows(): + feat = QgsFeature(layer.fields()) + attrs = [] + for column in df.columns: + val = row[column] + if pd.isna(val): + val = None + elif isinstance(val, np.generic): + try: + val = val.item() + except Exception: + pass + attrs.append(val) + feat.setAttributes(attrs) + features.append(feat) + if features: + provider.addFeatures(features) + layer.updateExtents() + return layer + + # ---------- main function you'll call inside processAlgorithm ---------- diff --git a/loopstructural/plugin_main.py b/loopstructural/plugin_main.py index 55199f5..d9e8332 100644 --- a/loopstructural/plugin_main.py +++ b/loopstructural/plugin_main.py @@ -140,6 +140,11 @@ def initGui(self): self.tr("LoopStructural Modelling"), self.iface.mainWindow(), ) + self.action_data_conversion = QAction( + self.tr("LoopStructural Data Conversion"), + self.iface.mainWindow(), + ) + self.action_data_conversion.triggered.connect(self.show_data_conversion_dialog) self.action_visualisation = QAction( QIcon(os.path.dirname(__file__) + "/3D_icon.png"), self.tr("LoopStructural Visualisation"), @@ -147,10 +152,12 @@ def initGui(self): ) self.toolbar.addAction(self.action_modelling) + self.toolbar.addAction(self.action_data_conversion) self.toolbar.addAction(self.action_fault_topology) # -- Menu self.iface.addPluginToMenu(__title__, self.action_settings) self.iface.addPluginToMenu(__title__, self.action_help) + self.iface.addPluginToMenu(__title__, self.action_data_conversion) self.initProcessing() # Map2Loop tool actions @@ -338,6 +345,16 @@ def show_sampler_dialog(self): ) dialog.exec_() + def show_data_conversion_dialog(self): + """Show the data conversion dialog.""" + from loopstructural.gui.data_conversion import AutomaticConversionDialog + + dialog = AutomaticConversionDialog( + self.iface.mainWindow(), + project=self.data_manager.project if self.data_manager else None, + ) + dialog.exec_() + def show_sorter_dialog(self): """Show the automatic stratigraphic sorter dialog.""" from loopstructural.gui.map2loop_tools import SorterDialog @@ -449,6 +466,7 @@ def unload(self): for attr in ( "action_help", "action_settings", + "action_data_conversion", "action_sampler", "action_sorter", "action_user_sorter", diff --git a/loopstructural/requirements.txt b/loopstructural/requirements.txt index 8ecd2f2..b582bb9 100644 --- a/loopstructural/requirements.txt +++ b/loopstructural/requirements.txt @@ -6,3 +6,4 @@ loopsolver geopandas numpy==1.26.4 map2loop~=3.3 +LoopDataConverter==0.3.0 \ No newline at end of file diff --git a/tests/unit/gui/test_ntgs_configuration_model.py b/tests/unit/gui/test_ntgs_configuration_model.py new file mode 100644 index 0000000..997ede9 --- /dev/null +++ b/tests/unit/gui/test_ntgs_configuration_model.py @@ -0,0 +1,33 @@ +import pytest + +from loopstructural.gui.data_conversion.configuration import NtgsConfig, NtgsConfigurationModel + + +def test_model_returns_deep_copy(): + base = NtgsConfig().as_dict() + model = NtgsConfigurationModel(base_config=base) + + exported = model.as_dict() + exported["geology"]["unitname_column"] = "CustomFormation" + + assert model.get_value("geology", "unitname_column") == base["geology"]["unitname_column"] + + +def test_set_value_coerces_lists(): + model = NtgsConfigurationModel() + model.set_value("geology", "ignore_lithology_codes", "cover, Unknown , ,") + + assert model.get_value("geology", "ignore_lithology_codes") == ["cover", "Unknown"] + + +def test_update_values_casts_to_string(): + model = NtgsConfigurationModel() + model.update_values("fault", {"dip_null_value": -123}) + + assert model.get_value("fault", "dip_null_value") == "-123" + + +def test_unknown_data_type_raises(): + model = NtgsConfigurationModel() + with pytest.raises(KeyError): + model.set_value("unknown", "some_field", "value")