From 3396ead1edfdf151deac59c73cab23fb8a804902 Mon Sep 17 00:00:00 2001 From: nicornk Date: Thu, 27 Nov 2025 15:44:54 +0100 Subject: [PATCH 01/12] feat: add initial furnace client --- .../clients/foundry_sql_server.py | 260 +++++++++++++++++- .../src/foundry_dev_tools/config/context.py | 5 + .../src/foundry_dev_tools/errors/handling.py | 5 +- .../src/foundry_dev_tools/errors/sql.py | 6 + .../clients/test_foundry_sql_server.py | 111 +++++++- 5 files changed, 382 insertions(+), 5 deletions(-) diff --git a/libs/foundry-dev-tools/src/foundry_dev_tools/clients/foundry_sql_server.py b/libs/foundry-dev-tools/src/foundry_dev_tools/clients/foundry_sql_server.py index 26a6d853..f8844aa0 100644 --- a/libs/foundry-dev-tools/src/foundry_dev_tools/clients/foundry_sql_server.py +++ b/libs/foundry-dev-tools/src/foundry_dev_tools/clients/foundry_sql_server.py @@ -4,7 +4,7 @@ import time import warnings -from typing import TYPE_CHECKING, Literal, overload +from typing import TYPE_CHECKING, Any, Literal, overload from foundry_dev_tools.clients.api_client import APIClient from foundry_dev_tools.errors.handling import ErrorHandlingConfig @@ -296,3 +296,261 @@ def api_queries_results( }, **kwargs, ) + + +class FoundrySqlServerClientV2(APIClient): + """FoundrySqlServerClientV2 implements the newer foundry-sql-server API. + + This client uses a different API flow compared to V1: + - Executes queries via POST to /api/ with applicationId and sql + - Polls POST to /api/status for query completion + - Retrieves results via POST to /api/stream with tickets + """ + + api_name = "foundry-sql-server" + + @overload + def query_foundry_sql( + self, + query: str, + application_id: str, + return_type: Literal["pandas"], + disable_arrow_compression: bool = ..., + timeout: int = ..., + ) -> pd.core.frame.DataFrame: ... + + @overload + def query_foundry_sql( + self, + query: str, + application_id: str, + return_type: Literal["spark"], + disable_arrow_compression: bool = ..., + timeout: int = ..., + ) -> pyspark.sql.DataFrame: ... + + @overload + def query_foundry_sql( + self, + query: str, + application_id: str, + return_type: Literal["arrow"], + disable_arrow_compression: bool = ..., + timeout: int = ..., + ) -> pa.Table: ... + + @overload + def query_foundry_sql( + self, + query: str, + application_id: str, + return_type: SQLReturnType = ..., + disable_arrow_compression: bool = ..., + timeout: int = ..., + ) -> tuple[dict, list[list]] | pd.core.frame.DataFrame | pa.Table | pyspark.sql.DataFrame: ... + + def query_foundry_sql( + self, + query: str, + return_type: SQLReturnType = "pandas", + disable_arrow_compression: bool = False, + application_id: str | None = None, + ) -> tuple[dict, list[list]] | pd.core.frame.DataFrame | pa.Table | pyspark.sql.DataFrame: + """Queries the Foundry SQL server using the V2 API. + + Uses Arrow IPC to communicate with the Foundry SQL Server Endpoint. + + Example: + df = client.query_foundry_sql( + query="SELECT * FROM `ri.foundry.main.dataset.abc` LIMIT 10", + application_id="ri.foundry.main.dataset.abc" + ) + + Args: + query: The SQL Query + return_type: See :py:class:foundry_dev_tools.foundry_api_client.SQLReturnType + disable_arrow_compression: Whether to disable Arrow compression + application_id: The application/dataset RID, defaults to foundry-dev-tools User-Agent + + Returns: + :external+pandas:py:class:`~pandas.DataFrame` | :external+pyarrow:py:class:`~pyarrow.Table` | :external+spark:py:class:`~pyspark.sql.DataFrame`: + + A pandas DataFrame, Spark DataFrame or pyarrow.Table with the result. + + Raises: + FoundrySqlQueryFailedError: If the query fails + FoundrySqlQueryClientTimedOutError: If the query times out + + """ # noqa: E501 + # Execute the query + if not application_id: + application_id = self.context.client.headers["User-Agent"] + response_json = self.api_execute( + sql=query, + application_id=application_id, + disable_arrow_compression=disable_arrow_compression, + ).json() + + query_identifier = self._extract_query_identifier(response_json) + + # Poll for completion + while response_json.get("type") != "success": + time.sleep(0.2) + response = self.api_status(query_identifier) + response_json = response.json() + + if response_json.get("type") == "failed": + raise FoundrySqlQueryFailedError(response) + + # Extract tickets from successful response + tickets = self._extract_tickets(response_json) + + # Fetch Arrow data using tickets + arrow_stream_reader = self.read_stream_results_arrow(tickets) + + if return_type == "pandas": + return arrow_stream_reader.read_pandas() + + if return_type == "spark": + from foundry_dev_tools.utils.converter.foundry_spark import ( + arrow_stream_to_spark_dataframe, + ) + + return arrow_stream_to_spark_dataframe(arrow_stream_reader) + + if return_type == "arrow": + return arrow_stream_reader.read_all() + + raise ValueError("The following return_type is not supported: " + return_type) + + def _extract_query_identifier(self, response_json: dict[str, Any]) -> dict[str, Any]: + """Extract query identifier from execute response. + + Args: + response_json: Response JSON from execute API + + Returns: + Query identifier dict + + """ + if response_json.get("type") == "pending": + return response_json["pending"]["query"] + if response_json.get("type") == "success": + return response_json["success"]["query"] + msg = f"Unexpected response type: {response_json.get('type')}" + raise ValueError(msg) + + def _extract_tickets(self, response_json: dict[str, Any]) -> list[str]: + """Extract tickets from success response. + + Args: + response_json: Success response JSON from status API + + Returns: + List of tickets for fetching results + + """ + if response_json.get("type") != "success": + msg = f"Expected success response, got: {response_json.get('type')}" + raise ValueError(msg) + + chunks = response_json["success"]["result"]["interactive"]["chunks"] + return [chunk["ticket"] for chunk in chunks] + + def read_stream_results_arrow(self, tickets: list[str]) -> pa.ipc.RecordBatchStreamReader: + """Fetch query results using tickets and return Arrow stream reader. + + Args: + tickets: List of tickets from status API success response + + Returns: + Arrow RecordBatchStreamReader + + """ + from foundry_dev_tools._optional.pyarrow import pa + + response = self._api_stream_ticket(tickets) + + return pa.ipc.RecordBatchStreamReader(response.raw) + + def api_execute( + self, + sql: str, + application_id: str, + disable_arrow_compression: bool = False, + **kwargs, + ) -> requests.Response: + """Execute a SQL query via the V2 API. + + Args: + sql: The SQL query to execute + application_id: The application/dataset RID + disable_arrow_compression: Whether to disable Arrow compression + **kwargs: gets passed to :py:meth:`APIClient.api_request` + + Returns: + Response with query execution status + + """ + return self.api_request( + "POST", + "", # Root endpoint /api/ + json={ + "applicationId": application_id, + "sql": sql, + "disableArrowCompression": disable_arrow_compression, + }, + **kwargs, + ) + + def api_status( + self, + query_identifier: dict[str, Any], + **kwargs, + ) -> requests.Response: + """Get the status of a SQL query via the V2 API. + + Args: + query_identifier: Query identifier dict (e.g., {"type": "interactive", "interactive": "query-id"}) + **kwargs: gets passed to :py:meth:`APIClient.api_request` + + Returns: + Response with query status + + """ + return self.api_request( + "POST", + "status", + json={ + "query": query_identifier, + }, + **kwargs, + ) + + def _api_stream_ticket( + self, + tickets: list[str], + **kwargs, + ) -> requests.Response: + """Fetch query results using tickets via the V2 API. + + Args: + tickets: List of tickets from status API success response + **kwargs: gets passed to :py:meth:`APIClient.api_request` + + Returns: + Response with Arrow-encoded query results + + """ + return self.api_request( + "POST", + "stream", + json={ + "tickets": tickets, + }, + headers={ + "Accept": "application/octet-stream", + }, + stream=True, + **kwargs, + ) diff --git a/libs/foundry-dev-tools/src/foundry_dev_tools/config/context.py b/libs/foundry-dev-tools/src/foundry_dev_tools/config/context.py index 7a956dcb..f0ce36e5 100644 --- a/libs/foundry-dev-tools/src/foundry_dev_tools/config/context.py +++ b/libs/foundry-dev-tools/src/foundry_dev_tools/config/context.py @@ -152,6 +152,11 @@ def foundry_sql_server(self) -> foundry_sql_server.FoundrySqlServerClient: """Returns :py:class:`foundry_dev_tools.clients.foundry_sql_server.FoundrySqlServerClient`.""" return foundry_sql_server.FoundrySqlServerClient(self) + @cached_property + def foundry_sql_server_v2(self) -> foundry_sql_server.FoundrySqlServerClientV2: + """Returns :py:class:`foundry_dev_tools.clients.foundry_sql_server.FoundrySqlServerClientV2`.""" + return foundry_sql_server.FoundrySqlServerClientV2(self) + @cached_property def build2(self) -> build2.Build2Client: """Returns :py:class:`foundry_dev_tools.clients.build2.Build2Client`.""" diff --git a/libs/foundry-dev-tools/src/foundry_dev_tools/errors/handling.py b/libs/foundry-dev-tools/src/foundry_dev_tools/errors/handling.py index 12871238..4c502770 100644 --- a/libs/foundry-dev-tools/src/foundry_dev_tools/errors/handling.py +++ b/libs/foundry-dev-tools/src/foundry_dev_tools/errors/handling.py @@ -47,9 +47,7 @@ ) from foundry_dev_tools.errors.meta import FoundryAPIError from foundry_dev_tools.errors.multipass import DuplicateGroupNameError -from foundry_dev_tools.errors.sql import ( - FoundrySqlQueryFailedError, -) +from foundry_dev_tools.errors.sql import FoundrySqlQueryFailedError, FurnaceSqlSqlParseError from foundry_dev_tools.utils.misc import decamelize LOGGER = logging.getLogger(__name__) @@ -59,6 +57,7 @@ "DataProxy:SchemaNotFound": DatasetHasNoSchemaError, "DataProxy:FallbackBranchesNotSpecifiedInQuery": BranchNotFoundError, "DataProxy:BadSqlQuery": FoundrySqlQueryFailedError, + "FurnaceSql:SqlParseError": FurnaceSqlSqlParseError, "DataProxy:DatasetNotFound": DatasetNotFoundError, "Catalog:DuplicateDatasetName": DatasetAlreadyExistsError, "Catalog:DatasetsNotFound": DatasetNotFoundError, diff --git a/libs/foundry-dev-tools/src/foundry_dev_tools/errors/sql.py b/libs/foundry-dev-tools/src/foundry_dev_tools/errors/sql.py index efe789a5..567cd5d9 100644 --- a/libs/foundry-dev-tools/src/foundry_dev_tools/errors/sql.py +++ b/libs/foundry-dev-tools/src/foundry_dev_tools/errors/sql.py @@ -20,6 +20,12 @@ def __init__(self, response: requests.Response): super().__init__(response=response, info=self.error_message) +class FurnaceSqlSqlParseError(FoundryAPIError): + """Exception is thrown when SQL Query is not valid.""" + + message = "Foundry SQL Query Parsing Failed." + + class FoundrySqlQueryClientTimedOutError(FoundryAPIError): """Exception is thrown when the Query execution time exceeded the client timeout value.""" diff --git a/tests/integration/clients/test_foundry_sql_server.py b/tests/integration/clients/test_foundry_sql_server.py index 929cddde..78d2a150 100644 --- a/tests/integration/clients/test_foundry_sql_server.py +++ b/tests/integration/clients/test_foundry_sql_server.py @@ -2,7 +2,11 @@ import pytest from foundry_dev_tools.errors.dataset import BranchNotFoundError, DatasetHasNoSchemaError, DatasetNotFoundError -from foundry_dev_tools.errors.sql import FoundrySqlQueryFailedError, FoundrySqlSerializationFormatNotImplementedError +from foundry_dev_tools.errors.sql import ( + FoundrySqlQueryFailedError, + FoundrySqlSerializationFormatNotImplementedError, + FurnaceSqlSqlParseError, +) from tests.integration.conftest import TEST_SINGLETON @@ -67,3 +71,108 @@ def test_legacy_fallback(mocker): TEST_SINGLETON.ctx.foundry_sql_server.query_foundry_sql(f"SELECT * FROM `{TEST_SINGLETON.iris_new.rid}`") query_foundry_sql_legacy_spy.assert_called() + + +# V2 Client Tests + + +def test_v2_smoke(): + """Test basic V2 client functionality with a simple query.""" + one_row_one_column = TEST_SINGLETON.ctx.foundry_sql_server_v2.query_foundry_sql( + query=f"SELECT sepal_width FROM `{TEST_SINGLETON.iris_new.rid}` LIMIT 1", + application_id=TEST_SINGLETON.iris_new.rid, + ) + assert one_row_one_column.shape == (1, 1) + + one_row_one_column = TEST_SINGLETON.ctx.foundry_sql_server_v2.query_foundry_sql( + query=f"SELECT sepal_width FROM `{TEST_SINGLETON.iris_new.rid}` LIMIT 1", + application_id=TEST_SINGLETON.iris_new.rid, + return_type="arrow", + ) + assert one_row_one_column.num_columns == 1 + assert one_row_one_column.num_rows == 1 + assert one_row_one_column.column_names == ["sepal_width"] + + +def test_v2_multiple_rows(): + """Test V2 client with multiple rows.""" + result = TEST_SINGLETON.ctx.foundry_sql_server_v2.query_foundry_sql( + query=f"SELECT * FROM `{TEST_SINGLETON.iris_new.rid}` LIMIT 10", + application_id=TEST_SINGLETON.iris_new.rid, + ) + assert result.shape[0] == 10 + assert result.shape[1] == 5 # iris dataset has 5 columns + + +def test_v2_return_type_arrow(): + """Test V2 client with Arrow return type.""" + import pyarrow as pa + + result = TEST_SINGLETON.ctx.foundry_sql_server_v2.query_foundry_sql( + query=f"SELECT * FROM `{TEST_SINGLETON.iris_new.rid}` LIMIT 5", + application_id=TEST_SINGLETON.iris_new.rid, + return_type="arrow", + ) + assert isinstance(result, pa.Table) + assert result.num_rows == 5 + + +def test_v2_return_type_raw_not_supported(): + """Test V2 client with raw return type.""" + with pytest.raises(ValueError, match="The following return_type is not supported: .+"): + schema, rows = TEST_SINGLETON.ctx.foundry_sql_server_v2.query_foundry_sql( + query=f"SELECT sepal_width, sepal_length FROM `{TEST_SINGLETON.iris_new.rid}` LIMIT 3", + application_id=TEST_SINGLETON.iris_new.rid, + return_type="raw", + ) + + +def test_v2_aggregation_query(): + """Test V2 client with aggregation query.""" + result = TEST_SINGLETON.ctx.foundry_sql_server_v2.query_foundry_sql( + query=f""" + SELECT + COUNT(*) as total_count, + AVG(sepal_width) as avg_sepal_width + FROM `{TEST_SINGLETON.iris_new.rid}` + """, + application_id=TEST_SINGLETON.iris_new.rid, + ) + assert result.shape == (1, 2) + assert "total_count" in result.columns + assert "avg_sepal_width" in result.columns + + +def test_v2_query_failed(): + """Test V2 client with invalid SQL query.""" + with pytest.raises(FurnaceSqlSqlParseError): + TEST_SINGLETON.ctx.foundry_sql_server_v2.query_foundry_sql( + query=f"SELECT foo, bar, FROM `{TEST_SINGLETON.iris_new.rid}` LIMIT 100", + application_id=TEST_SINGLETON.iris_new.rid, + ) + + +def test_v2_disable_arrow_compression(): + """Test V2 client with arrow compression disabled.""" + result = TEST_SINGLETON.ctx.foundry_sql_server_v2.query_foundry_sql( + query=f"SELECT * FROM `{TEST_SINGLETON.iris_new.rid}` LIMIT 5", + application_id=TEST_SINGLETON.iris_new.rid, + disable_arrow_compression=True, + ) + assert result.shape[0] == 5 + + +def test_v2_with_where_clause(): + """Test V2 client with WHERE clause.""" + result = TEST_SINGLETON.ctx.foundry_sql_server_v2.query_foundry_sql( + query=f""" + SELECT * FROM `{TEST_SINGLETON.iris_new.rid}` + WHERE is_setosa = 'setosa' + LIMIT 20 + """, + application_id=TEST_SINGLETON.iris_new.rid, + ) + assert result.shape[0] <= 20 + # Verify all returned rows have is_setosa = 'setosa' + if result.shape[0] > 0: + assert all(result["is_setosa"] == "setosa") From 9c7ec17b91386195d9a83cf1334de4cc79999c99 Mon Sep 17 00:00:00 2001 From: nicornk Date: Thu, 27 Nov 2025 16:18:05 +0100 Subject: [PATCH 02/12] fix decode_content --- .../src/foundry_dev_tools/clients/foundry_sql_server.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/libs/foundry-dev-tools/src/foundry_dev_tools/clients/foundry_sql_server.py b/libs/foundry-dev-tools/src/foundry_dev_tools/clients/foundry_sql_server.py index f8844aa0..4d74f61e 100644 --- a/libs/foundry-dev-tools/src/foundry_dev_tools/clients/foundry_sql_server.py +++ b/libs/foundry-dev-tools/src/foundry_dev_tools/clients/foundry_sql_server.py @@ -452,6 +452,7 @@ def _extract_tickets(self, response_json: dict[str, Any]) -> list[str]: """ if response_json.get("type") != "success": msg = f"Expected success response, got: {response_json.get('type')}" + raise ValueError(msg) chunks = response_json["success"]["result"]["interactive"]["chunks"] @@ -470,6 +471,7 @@ def read_stream_results_arrow(self, tickets: list[str]) -> pa.ipc.RecordBatchStr from foundry_dev_tools._optional.pyarrow import pa response = self._api_stream_ticket(tickets) + response.raw.decode_content = True return pa.ipc.RecordBatchStreamReader(response.raw) From fd9ad6e1262dc27d32bdf3c5bc4d7cf3d5e7f273 Mon Sep 17 00:00:00 2001 From: nicornk Date: Tue, 17 Feb 2026 12:25:33 +0100 Subject: [PATCH 03/12] Fix response format --- .../foundry_dev_tools/clients/foundry_sql_server.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/libs/foundry-dev-tools/src/foundry_dev_tools/clients/foundry_sql_server.py b/libs/foundry-dev-tools/src/foundry_dev_tools/clients/foundry_sql_server.py index 4d74f61e..dbb579e3 100644 --- a/libs/foundry-dev-tools/src/foundry_dev_tools/clients/foundry_sql_server.py +++ b/libs/foundry-dev-tools/src/foundry_dev_tools/clients/foundry_sql_server.py @@ -2,6 +2,7 @@ from __future__ import annotations +import logging import time import warnings from typing import TYPE_CHECKING, Any, Literal, overload @@ -22,6 +23,8 @@ import pyspark import requests +LOGGER = logging.getLogger(__name__) + class FoundrySqlServerClient(APIClient): """FoundrySqlServerClient class that implements methods from the 'foundry-sql-server' API.""" @@ -433,12 +436,10 @@ def _extract_query_identifier(self, response_json: dict[str, Any]) -> dict[str, Query identifier dict """ - if response_json.get("type") == "pending": - return response_json["pending"]["query"] - if response_json.get("type") == "success": - return response_json["success"]["query"] - msg = f"Unexpected response type: {response_json.get('type')}" - raise ValueError(msg) + if response_json["type"] == "triggered" and "plan" in response_json["triggered"]: + plan = response_json["triggered"]["plan"] + LOGGER.debug("plan %s", plan) + return response_json[response_json["type"]]["query"] def _extract_tickets(self, response_json: dict[str, Any]) -> list[str]: """Extract tickets from success response. From 4a23420f9f0b7730f6bbd5845e70feaef9464080 Mon Sep 17 00:00:00 2001 From: nicornk Date: Tue, 17 Feb 2026 13:30:16 +0100 Subject: [PATCH 04/12] support polars in v2 as well. --- .../clients/foundry_sql_server.py | 28 ++++++++++++++++--- .../clients/test_foundry_sql_server.py | 10 +++++++ 2 files changed, 34 insertions(+), 4 deletions(-) diff --git a/libs/foundry-dev-tools/src/foundry_dev_tools/clients/foundry_sql_server.py b/libs/foundry-dev-tools/src/foundry_dev_tools/clients/foundry_sql_server.py index dbb579e3..a83ed3bb 100644 --- a/libs/foundry-dev-tools/src/foundry_dev_tools/clients/foundry_sql_server.py +++ b/libs/foundry-dev-tools/src/foundry_dev_tools/clients/foundry_sql_server.py @@ -322,6 +322,16 @@ def query_foundry_sql( timeout: int = ..., ) -> pd.core.frame.DataFrame: ... + @overload + def query_foundry_sql( + self, + query: str, + return_type: Literal["polars"], + branch: Ref = ..., + sql_dialect: SqlDialect = ..., + timeout: int = ..., + ) -> pl.DataFrame: ... + @overload def query_foundry_sql( self, @@ -350,7 +360,7 @@ def query_foundry_sql( return_type: SQLReturnType = ..., disable_arrow_compression: bool = ..., timeout: int = ..., - ) -> tuple[dict, list[list]] | pd.core.frame.DataFrame | pa.Table | pyspark.sql.DataFrame: ... + ) -> tuple[dict, list[list]] | pd.core.frame.DataFrame | pl.DataFrame | pa.Table | pyspark.sql.DataFrame: ... def query_foundry_sql( self, @@ -358,7 +368,7 @@ def query_foundry_sql( return_type: SQLReturnType = "pandas", disable_arrow_compression: bool = False, application_id: str | None = None, - ) -> tuple[dict, list[list]] | pd.core.frame.DataFrame | pa.Table | pyspark.sql.DataFrame: + ) -> tuple[dict, list[list]] | pd.core.frame.DataFrame | pl.DataFrame | pa.Table | pyspark.sql.DataFrame: """Queries the Foundry SQL server using the V2 API. Uses Arrow IPC to communicate with the Foundry SQL Server Endpoint. @@ -376,9 +386,9 @@ def query_foundry_sql( application_id: The application/dataset RID, defaults to foundry-dev-tools User-Agent Returns: - :external+pandas:py:class:`~pandas.DataFrame` | :external+pyarrow:py:class:`~pyarrow.Table` | :external+spark:py:class:`~pyspark.sql.DataFrame`: + :external+pandas:py:class:`~pandas.DataFrame` | :external+polars:py:class:`~polars.DataFrame` | :external+pyarrow:py:class:`~pyarrow.Table` | :external+spark:py:class:`~pyspark.sql.DataFrame`: - A pandas DataFrame, Spark DataFrame or pyarrow.Table with the result. + A pandas DataFrame, polars, Spark DataFrame or pyarrow.Table with the result. Raises: FoundrySqlQueryFailedError: If the query fails @@ -414,6 +424,16 @@ def query_foundry_sql( if return_type == "pandas": return arrow_stream_reader.read_pandas() + if return_type == "polars": + # The FakeModule implementation used in the _optional packages + # throws an ImportError when trying to access attributes of the module. + # This ImportError is caught below to fall back to query_foundry_sql_legacy + # which will again raise an ImportError when polars is not installed. + from foundry_dev_tools._optional.polars import pl + + arrow_table = arrow_stream_reader.read_all() + return pl.from_arrow(arrow_table) + if return_type == "spark": from foundry_dev_tools.utils.converter.foundry_spark import ( arrow_stream_to_spark_dataframe, diff --git a/tests/integration/clients/test_foundry_sql_server.py b/tests/integration/clients/test_foundry_sql_server.py index 78d2a150..d353b150 100644 --- a/tests/integration/clients/test_foundry_sql_server.py +++ b/tests/integration/clients/test_foundry_sql_server.py @@ -176,3 +176,13 @@ def test_v2_with_where_clause(): # Verify all returned rows have is_setosa = 'setosa' if result.shape[0] > 0: assert all(result["is_setosa"] == "setosa") + + +def test_v2_polars_return_type(): + polars_df = TEST_SINGLETON.ctx.foundry_sql_server_v2.query_foundry_sql( + f"SELECT sepal_length FROM `{TEST_SINGLETON.iris_new.rid}` LIMIT 2", + return_type="polars", + ) + assert isinstance(polars_df, pl.DataFrame) + assert polars_df.height == 2 + assert polars_df.width == 1 From f948c2d9e5246c149a14401cf0bf72d4263ea4ed Mon Sep 17 00:00:00 2001 From: nicornk Date: Wed, 18 Feb 2026 15:48:34 +0100 Subject: [PATCH 05/12] adjust to new furnace API --- .../clients/foundry_sql_server.py | 163 ++++++++++-------- .../src/foundry_dev_tools/errors/handling.py | 1 + .../src/foundry_dev_tools/utils/api_types.py | 3 + .../clients/test_foundry_sql_server.py | 10 -- 4 files changed, 93 insertions(+), 84 deletions(-) diff --git a/libs/foundry-dev-tools/src/foundry_dev_tools/clients/foundry_sql_server.py b/libs/foundry-dev-tools/src/foundry_dev_tools/clients/foundry_sql_server.py index a83ed3bb..87328e01 100644 --- a/libs/foundry-dev-tools/src/foundry_dev_tools/clients/foundry_sql_server.py +++ b/libs/foundry-dev-tools/src/foundry_dev_tools/clients/foundry_sql_server.py @@ -14,7 +14,7 @@ FoundrySqlQueryFailedError, FoundrySqlSerializationFormatNotImplementedError, ) -from foundry_dev_tools.utils.api_types import Ref, SqlDialect, SQLReturnType, assert_in_literal +from foundry_dev_tools.utils.api_types import ArrowCompressionCodec, Ref, SqlDialect, SQLReturnType, assert_in_literal if TYPE_CHECKING: import pandas as pd @@ -316,9 +316,10 @@ class FoundrySqlServerClientV2(APIClient): def query_foundry_sql( self, query: str, - application_id: str, return_type: Literal["pandas"], - disable_arrow_compression: bool = ..., + branch: Ref = ..., + sql_dialect: SqlDialect = ..., + arrow_compression_codec: ArrowCompressionCodec = ..., timeout: int = ..., ) -> pd.core.frame.DataFrame: ... @@ -329,6 +330,7 @@ def query_foundry_sql( return_type: Literal["polars"], branch: Ref = ..., sql_dialect: SqlDialect = ..., + arrow_compression_codec: ArrowCompressionCodec = ..., timeout: int = ..., ) -> pl.DataFrame: ... @@ -336,9 +338,10 @@ def query_foundry_sql( def query_foundry_sql( self, query: str, - application_id: str, return_type: Literal["spark"], - disable_arrow_compression: bool = ..., + branch: Ref = ..., + sql_dialect: SqlDialect = ..., + arrow_compression_codec: ArrowCompressionCodec = ..., timeout: int = ..., ) -> pyspark.sql.DataFrame: ... @@ -346,9 +349,10 @@ def query_foundry_sql( def query_foundry_sql( self, query: str, - application_id: str, return_type: Literal["arrow"], - disable_arrow_compression: bool = ..., + branch: Ref = ..., + sql_dialect: SqlDialect = ..., + arrow_compression_codec: ArrowCompressionCodec = ..., timeout: int = ..., ) -> pa.Table: ... @@ -356,9 +360,10 @@ def query_foundry_sql( def query_foundry_sql( self, query: str, - application_id: str, return_type: SQLReturnType = ..., - disable_arrow_compression: bool = ..., + branch: Ref = ..., + sql_dialect: SqlDialect = ..., + arrow_compression_codec: ArrowCompressionCodec = ..., timeout: int = ..., ) -> tuple[dict, list[list]] | pd.core.frame.DataFrame | pl.DataFrame | pa.Table | pyspark.sql.DataFrame: ... @@ -366,8 +371,10 @@ def query_foundry_sql( self, query: str, return_type: SQLReturnType = "pandas", - disable_arrow_compression: bool = False, - application_id: str | None = None, + branch: Ref = "master", + sql_dialect: SqlDialect = "SPARK", + arrow_compression_codec: ArrowCompressionCodec = "NONE", + timeout: int = 600, ) -> tuple[dict, list[list]] | pd.core.frame.DataFrame | pl.DataFrame | pa.Table | pyspark.sql.DataFrame: """Queries the Foundry SQL server using the V2 API. @@ -375,15 +382,16 @@ def query_foundry_sql( Example: df = client.query_foundry_sql( - query="SELECT * FROM `ri.foundry.main.dataset.abc` LIMIT 10", - application_id="ri.foundry.main.dataset.abc" + query="SELECT * FROM `ri.foundry.main.dataset.abc` LIMIT 10" ) Args: query: The SQL Query return_type: See :py:class:foundry_dev_tools.foundry_api_client.SQLReturnType - disable_arrow_compression: Whether to disable Arrow compression - application_id: The application/dataset RID, defaults to foundry-dev-tools User-Agent + branch: The dataset branch to query + sql_dialect: The SQL dialect to use + arrow_compression_codec: Arrow compression codec (NONE, LZ4, ZSTD) + timeout: Query timeout in seconds Returns: :external+pandas:py:class:`~pandas.DataFrame` | :external+polars:py:class:`~polars.DataFrame` | :external+pyarrow:py:class:`~pyarrow.Table` | :external+spark:py:class:`~pyspark.sql.DataFrame`: @@ -395,31 +403,29 @@ def query_foundry_sql( FoundrySqlQueryClientTimedOutError: If the query times out """ # noqa: E501 - # Execute the query - if not application_id: - application_id = self.context.client.headers["User-Agent"] - response_json = self.api_execute( - sql=query, - application_id=application_id, - disable_arrow_compression=disable_arrow_compression, + response_json = self.api_query( + query=query, dialect=sql_dialect, branch=branch, arrow_compression_codec=arrow_compression_codec ).json() - query_identifier = self._extract_query_identifier(response_json) + query_handle = self._extract_query_handle(response_json) + start_time = time.time() # Poll for completion - while response_json.get("type") != "success": + while response_json.get("status", {}).get("type") != "ready": time.sleep(0.2) - response = self.api_status(query_identifier) + response = self.api_status(query_handle) response_json = response.json() - if response_json.get("type") == "failed": + if response_json.get("status", {}).get("type") == "failed": raise FoundrySqlQueryFailedError(response) + if time.time() > start_time + timeout: + raise FoundrySqlQueryClientTimedOutError(response, timeout=timeout) # Extract tickets from successful response - tickets = self._extract_tickets(response_json) + ticket = self._extract_ticket(response_json) # Fetch Arrow data using tickets - arrow_stream_reader = self.read_stream_results_arrow(tickets) + arrow_stream_reader = self.read_stream_results_arrow(ticket) if return_type == "pandas": return arrow_stream_reader.read_pandas() @@ -446,22 +452,20 @@ def query_foundry_sql( raise ValueError("The following return_type is not supported: " + return_type) - def _extract_query_identifier(self, response_json: dict[str, Any]) -> dict[str, Any]: - """Extract query identifier from execute response. + def _extract_query_handle(self, response_json: dict[str, Any]) -> dict[str, Any]: + """Extract query handle from execute response. Args: response_json: Response JSON from execute API + Returns: - Query identifier dict + Query handle dict """ - if response_json["type"] == "triggered" and "plan" in response_json["triggered"]: - plan = response_json["triggered"]["plan"] - LOGGER.debug("plan %s", plan) - return response_json[response_json["type"]]["query"] + return response_json[response_json["type"]]["queryHandle"] - def _extract_tickets(self, response_json: dict[str, Any]) -> list[str]: + def _extract_ticket(self, response_json: dict[str, Any]) -> dict[str, Any]: """Extract tickets from success response. Args: @@ -471,19 +475,23 @@ def _extract_tickets(self, response_json: dict[str, Any]) -> list[str]: List of tickets for fetching results """ - if response_json.get("type") != "success": - msg = f"Expected success response, got: {response_json.get('type')}" - - raise ValueError(msg) - - chunks = response_json["success"]["result"]["interactive"]["chunks"] - return [chunk["ticket"] for chunk in chunks] - - def read_stream_results_arrow(self, tickets: list[str]) -> pa.ipc.RecordBatchStreamReader: + # we combine all tickets into one to get the full data + # if performance is a concern this should be done in parallel + return { + "id": 0, + "tickets": [ + ticket + for ticket_group in response_json["status"]["ready"]["tickets"] + for ticket in ticket_group["tickets"] + ], + "type": "furnace", + } + + def read_stream_results_arrow(self, ticket: dict[str, Any]) -> pa.ipc.RecordBatchStreamReader: """Fetch query results using tickets and return Arrow stream reader. Args: - tickets: List of tickets from status API success response + ticket: dict of tickets e.g. { "id": 0, "tickets": ["ey...", ...], "type": "furnace", } Returns: Arrow RecordBatchStreamReader @@ -491,50 +499,60 @@ def read_stream_results_arrow(self, tickets: list[str]) -> pa.ipc.RecordBatchStr """ from foundry_dev_tools._optional.pyarrow import pa - response = self._api_stream_ticket(tickets) + response = self.api_stream_ticket(ticket) response.raw.decode_content = True return pa.ipc.RecordBatchStreamReader(response.raw) - def api_execute( + def api_query( self, - sql: str, - application_id: str, - disable_arrow_compression: bool = False, + query: str, + dialect: SqlDialect, + branch: Ref, + arrow_compression_codec: ArrowCompressionCodec = "NONE", **kwargs, ) -> requests.Response: """Execute a SQL query via the V2 API. Args: - sql: The SQL query to execute - application_id: The application/dataset RID - disable_arrow_compression: Whether to disable Arrow compression + query: The SQL query string + dialect: The SQL dialect to use + branch: The dataset branch to query + arrow_compression_codec: Arrow compression codec (NONE, LZ4, ZSTD) **kwargs: gets passed to :py:meth:`APIClient.api_request` Returns: - Response with query execution status + Response with query handle and initial status """ return self.api_request( "POST", - "", # Root endpoint /api/ + "sql-endpoint/v1/queries/query", json={ - "applicationId": application_id, - "sql": sql, - "disableArrowCompression": disable_arrow_compression, + "querySpec": { + "query": query, + "tableProviders": {}, + "dialect": dialect, + "options": {"options": [{"option": "arrowCompressionCodec", "value": arrow_compression_codec}]}, + }, + "executionParams": { + "defaultBranchIds": [{"type": "datasetBranch", "datasetBranch": branch}], + "resultFormat": "ARROW", + "resultMode": "AUTO", + }, }, **kwargs, ) def api_status( self, - query_identifier: dict[str, Any], + query_handle: dict[str, Any], **kwargs, ) -> requests.Response: """Get the status of a SQL query via the V2 API. Args: - query_identifier: Query identifier dict (e.g., {"type": "interactive", "interactive": "query-id"}) + query_handle: Query handle dict from execute response **kwargs: gets passed to :py:meth:`APIClient.api_request` Returns: @@ -543,34 +561,31 @@ def api_status( """ return self.api_request( "POST", - "status", - json={ - "query": query_identifier, - }, + "sql-endpoint/v1/queries/status", + json=query_handle, **kwargs, ) - def _api_stream_ticket( + def api_stream_ticket( self, - tickets: list[str], + ticket: dict, **kwargs, ) -> requests.Response: - """Fetch query results using tickets via the V2 API. + """Stream query results using a ticket via the V2 API. Args: - tickets: List of tickets from status API success response + ticket: Ticket dict containing id, tickets list, and type. + Example: {"id": 0, "tickets": ["eyJhbGc...", "eyJhbGc..."], "type": "furnace"} **kwargs: gets passed to :py:meth:`APIClient.api_request` Returns: - Response with Arrow-encoded query results + Response with streaming Arrow data """ return self.api_request( "POST", - "stream", - json={ - "tickets": tickets, - }, + "sql-endpoint/v1/queries/stream", + json=ticket, headers={ "Accept": "application/octet-stream", }, diff --git a/libs/foundry-dev-tools/src/foundry_dev_tools/errors/handling.py b/libs/foundry-dev-tools/src/foundry_dev_tools/errors/handling.py index 4c502770..06d9363a 100644 --- a/libs/foundry-dev-tools/src/foundry_dev_tools/errors/handling.py +++ b/libs/foundry-dev-tools/src/foundry_dev_tools/errors/handling.py @@ -58,6 +58,7 @@ "DataProxy:FallbackBranchesNotSpecifiedInQuery": BranchNotFoundError, "DataProxy:BadSqlQuery": FoundrySqlQueryFailedError, "FurnaceSql:SqlParseError": FurnaceSqlSqlParseError, + "SqlQueryService:SqlSyntaxError": FurnaceSqlSqlParseError, "DataProxy:DatasetNotFound": DatasetNotFoundError, "Catalog:DuplicateDatasetName": DatasetAlreadyExistsError, "Catalog:DatasetsNotFound": DatasetNotFoundError, diff --git a/libs/foundry-dev-tools/src/foundry_dev_tools/utils/api_types.py b/libs/foundry-dev-tools/src/foundry_dev_tools/utils/api_types.py index 0eb3a9c6..99f486a1 100644 --- a/libs/foundry-dev-tools/src/foundry_dev_tools/utils/api_types.py +++ b/libs/foundry-dev-tools/src/foundry_dev_tools/utils/api_types.py @@ -95,6 +95,9 @@ def assert_in_literal(option, literal, variable_name) -> None: # noqa: ANN001 SqlDialect = Literal["ANSI", "SPARK"] """The SQL Dialect for Foundry SQL queries.""" +ArrowCompressionCodec = Literal["NONE", "LZ4", "ZSTD"] +"""The Arrow compression codec for Foundry SQL queries.""" + SQLReturnType = Literal["pandas", "polars", "spark", "arrow", "raw"] """The return_types for sql queries. diff --git a/tests/integration/clients/test_foundry_sql_server.py b/tests/integration/clients/test_foundry_sql_server.py index d353b150..c832d88c 100644 --- a/tests/integration/clients/test_foundry_sql_server.py +++ b/tests/integration/clients/test_foundry_sql_server.py @@ -80,13 +80,11 @@ def test_v2_smoke(): """Test basic V2 client functionality with a simple query.""" one_row_one_column = TEST_SINGLETON.ctx.foundry_sql_server_v2.query_foundry_sql( query=f"SELECT sepal_width FROM `{TEST_SINGLETON.iris_new.rid}` LIMIT 1", - application_id=TEST_SINGLETON.iris_new.rid, ) assert one_row_one_column.shape == (1, 1) one_row_one_column = TEST_SINGLETON.ctx.foundry_sql_server_v2.query_foundry_sql( query=f"SELECT sepal_width FROM `{TEST_SINGLETON.iris_new.rid}` LIMIT 1", - application_id=TEST_SINGLETON.iris_new.rid, return_type="arrow", ) assert one_row_one_column.num_columns == 1 @@ -98,7 +96,6 @@ def test_v2_multiple_rows(): """Test V2 client with multiple rows.""" result = TEST_SINGLETON.ctx.foundry_sql_server_v2.query_foundry_sql( query=f"SELECT * FROM `{TEST_SINGLETON.iris_new.rid}` LIMIT 10", - application_id=TEST_SINGLETON.iris_new.rid, ) assert result.shape[0] == 10 assert result.shape[1] == 5 # iris dataset has 5 columns @@ -110,7 +107,6 @@ def test_v2_return_type_arrow(): result = TEST_SINGLETON.ctx.foundry_sql_server_v2.query_foundry_sql( query=f"SELECT * FROM `{TEST_SINGLETON.iris_new.rid}` LIMIT 5", - application_id=TEST_SINGLETON.iris_new.rid, return_type="arrow", ) assert isinstance(result, pa.Table) @@ -122,7 +118,6 @@ def test_v2_return_type_raw_not_supported(): with pytest.raises(ValueError, match="The following return_type is not supported: .+"): schema, rows = TEST_SINGLETON.ctx.foundry_sql_server_v2.query_foundry_sql( query=f"SELECT sepal_width, sepal_length FROM `{TEST_SINGLETON.iris_new.rid}` LIMIT 3", - application_id=TEST_SINGLETON.iris_new.rid, return_type="raw", ) @@ -136,7 +131,6 @@ def test_v2_aggregation_query(): AVG(sepal_width) as avg_sepal_width FROM `{TEST_SINGLETON.iris_new.rid}` """, - application_id=TEST_SINGLETON.iris_new.rid, ) assert result.shape == (1, 2) assert "total_count" in result.columns @@ -148,7 +142,6 @@ def test_v2_query_failed(): with pytest.raises(FurnaceSqlSqlParseError): TEST_SINGLETON.ctx.foundry_sql_server_v2.query_foundry_sql( query=f"SELECT foo, bar, FROM `{TEST_SINGLETON.iris_new.rid}` LIMIT 100", - application_id=TEST_SINGLETON.iris_new.rid, ) @@ -156,8 +149,6 @@ def test_v2_disable_arrow_compression(): """Test V2 client with arrow compression disabled.""" result = TEST_SINGLETON.ctx.foundry_sql_server_v2.query_foundry_sql( query=f"SELECT * FROM `{TEST_SINGLETON.iris_new.rid}` LIMIT 5", - application_id=TEST_SINGLETON.iris_new.rid, - disable_arrow_compression=True, ) assert result.shape[0] == 5 @@ -170,7 +161,6 @@ def test_v2_with_where_clause(): WHERE is_setosa = 'setosa' LIMIT 20 """, - application_id=TEST_SINGLETON.iris_new.rid, ) assert result.shape[0] <= 20 # Verify all returned rows have is_setosa = 'setosa' From cec7efaa10193e407e1f371c0a4ced7b9314b396 Mon Sep 17 00:00:00 2001 From: nicornk Date: Fri, 20 Feb 2026 10:49:20 +0100 Subject: [PATCH 06/12] add unit tests --- .../clients/foundry_sql_server.py | 39 +++++-- .../src/foundry_dev_tools/utils/api_types.py | 3 + .../clients/test_foundry_sql_server.py | 89 +++++++++++++++ tests/unit/clients/test_foundry_sql_server.py | 103 ++++++++++++++++++ 4 files changed, 224 insertions(+), 10 deletions(-) diff --git a/libs/foundry-dev-tools/src/foundry_dev_tools/clients/foundry_sql_server.py b/libs/foundry-dev-tools/src/foundry_dev_tools/clients/foundry_sql_server.py index 87328e01..f613e775 100644 --- a/libs/foundry-dev-tools/src/foundry_dev_tools/clients/foundry_sql_server.py +++ b/libs/foundry-dev-tools/src/foundry_dev_tools/clients/foundry_sql_server.py @@ -14,7 +14,14 @@ FoundrySqlQueryFailedError, FoundrySqlSerializationFormatNotImplementedError, ) -from foundry_dev_tools.utils.api_types import ArrowCompressionCodec, Ref, SqlDialect, SQLReturnType, assert_in_literal +from foundry_dev_tools.utils.api_types import ( + ArrowCompressionCodec, + FurnaceSqlDialect, + Ref, + SqlDialect, + SQLReturnType, + assert_in_literal, +) if TYPE_CHECKING: import pandas as pd @@ -318,9 +325,10 @@ def query_foundry_sql( query: str, return_type: Literal["pandas"], branch: Ref = ..., - sql_dialect: SqlDialect = ..., + sql_dialect: FurnaceSqlDialect = ..., arrow_compression_codec: ArrowCompressionCodec = ..., timeout: int = ..., + experimental_use_trino: bool = ..., ) -> pd.core.frame.DataFrame: ... @overload @@ -329,9 +337,10 @@ def query_foundry_sql( query: str, return_type: Literal["polars"], branch: Ref = ..., - sql_dialect: SqlDialect = ..., + sql_dialect: FurnaceSqlDialect = ..., arrow_compression_codec: ArrowCompressionCodec = ..., timeout: int = ..., + experimental_use_trino: bool = ..., ) -> pl.DataFrame: ... @overload @@ -340,9 +349,10 @@ def query_foundry_sql( query: str, return_type: Literal["spark"], branch: Ref = ..., - sql_dialect: SqlDialect = ..., + sql_dialect: FurnaceSqlDialect = ..., arrow_compression_codec: ArrowCompressionCodec = ..., timeout: int = ..., + experimental_use_trino: bool = ..., ) -> pyspark.sql.DataFrame: ... @overload @@ -351,9 +361,10 @@ def query_foundry_sql( query: str, return_type: Literal["arrow"], branch: Ref = ..., - sql_dialect: SqlDialect = ..., + sql_dialect: FurnaceSqlDialect = ..., arrow_compression_codec: ArrowCompressionCodec = ..., timeout: int = ..., + experimental_use_trino: bool = ..., ) -> pa.Table: ... @overload @@ -362,9 +373,10 @@ def query_foundry_sql( query: str, return_type: SQLReturnType = ..., branch: Ref = ..., - sql_dialect: SqlDialect = ..., + sql_dialect: FurnaceSqlDialect = ..., arrow_compression_codec: ArrowCompressionCodec = ..., timeout: int = ..., + experimental_use_trino: bool = ..., ) -> tuple[dict, list[list]] | pd.core.frame.DataFrame | pl.DataFrame | pa.Table | pyspark.sql.DataFrame: ... def query_foundry_sql( @@ -372,9 +384,10 @@ def query_foundry_sql( query: str, return_type: SQLReturnType = "pandas", branch: Ref = "master", - sql_dialect: SqlDialect = "SPARK", + sql_dialect: FurnaceSqlDialect = "SPARK", arrow_compression_codec: ArrowCompressionCodec = "NONE", timeout: int = 600, + experimental_use_trino: bool = False, ) -> tuple[dict, list[list]] | pd.core.frame.DataFrame | pl.DataFrame | pa.Table | pyspark.sql.DataFrame: """Queries the Foundry SQL server using the V2 API. @@ -389,9 +402,10 @@ def query_foundry_sql( query: The SQL Query return_type: See :py:class:foundry_dev_tools.foundry_api_client.SQLReturnType branch: The dataset branch to query - sql_dialect: The SQL dialect to use + sql_dialect: The SQL dialect to use (only SPARK is supported for V2) arrow_compression_codec: Arrow compression codec (NONE, LZ4, ZSTD) timeout: Query timeout in seconds + experimental_use_trino: If True, modifies the query to use Trino backend by adding /*+ backend(trino) */ hint Returns: :external+pandas:py:class:`~pandas.DataFrame` | :external+polars:py:class:`~polars.DataFrame` | :external+pyarrow:py:class:`~pyarrow.Table` | :external+spark:py:class:`~pyspark.sql.DataFrame`: @@ -403,6 +417,11 @@ def query_foundry_sql( FoundrySqlQueryClientTimedOutError: If the query times out """ # noqa: E501 + assert_in_literal(sql_dialect, FurnaceSqlDialect, "sql_dialect") + + if experimental_use_trino: + query = query.replace("SELECT ", "SELECT /*+ backend(trino) */ ", 1) + response_json = self.api_query( query=query, dialect=sql_dialect, branch=branch, arrow_compression_codec=arrow_compression_codec ).json() @@ -507,7 +526,7 @@ def read_stream_results_arrow(self, ticket: dict[str, Any]) -> pa.ipc.RecordBatc def api_query( self, query: str, - dialect: SqlDialect, + dialect: FurnaceSqlDialect, branch: Ref, arrow_compression_codec: ArrowCompressionCodec = "NONE", **kwargs, @@ -516,7 +535,7 @@ def api_query( Args: query: The SQL query string - dialect: The SQL dialect to use + dialect: The SQL dialect to use (only SPARK is supported) branch: The dataset branch to query arrow_compression_codec: Arrow compression codec (NONE, LZ4, ZSTD) **kwargs: gets passed to :py:meth:`APIClient.api_request` diff --git a/libs/foundry-dev-tools/src/foundry_dev_tools/utils/api_types.py b/libs/foundry-dev-tools/src/foundry_dev_tools/utils/api_types.py index 99f486a1..eae34584 100644 --- a/libs/foundry-dev-tools/src/foundry_dev_tools/utils/api_types.py +++ b/libs/foundry-dev-tools/src/foundry_dev_tools/utils/api_types.py @@ -95,6 +95,9 @@ def assert_in_literal(option, literal, variable_name) -> None: # noqa: ANN001 SqlDialect = Literal["ANSI", "SPARK"] """The SQL Dialect for Foundry SQL queries.""" +FurnaceSqlDialect = Literal["SPARK"] +"""The SQL Dialect for Furnace SQL queries (V2 API). Only SPARK is supported.""" + ArrowCompressionCodec = Literal["NONE", "LZ4", "ZSTD"] """The Arrow compression codec for Foundry SQL queries.""" diff --git a/tests/integration/clients/test_foundry_sql_server.py b/tests/integration/clients/test_foundry_sql_server.py index c832d88c..98135734 100644 --- a/tests/integration/clients/test_foundry_sql_server.py +++ b/tests/integration/clients/test_foundry_sql_server.py @@ -73,6 +73,25 @@ def test_legacy_fallback(mocker): query_foundry_sql_legacy_spy.assert_called() +def test_v1_ansi_sql_dialect(): + """Test V1 client with ANSI SQL dialect (uses double quotes instead of backticks).""" + # Test basic query with ANSI dialect - note the use of double quotes instead of backticks + result = TEST_SINGLETON.ctx.foundry_sql_server.query_foundry_sql( + query=f'SELECT sepal_width, sepal_length FROM "{TEST_SINGLETON.iris_new.rid}" LIMIT 5', + sql_dialect="ANSI", + ) + assert result.shape[0] == 5 + assert result.shape[1] == 2 + + # Test with aggregation using ANSI dialect + result_agg = TEST_SINGLETON.ctx.foundry_sql_server.query_foundry_sql( + query=f'SELECT COUNT(*) as cnt FROM "{TEST_SINGLETON.iris_new.rid}"', + sql_dialect="ANSI", + ) + assert result_agg.shape[0] == 1 + assert "cnt" in result_agg.columns + + # V2 Client Tests @@ -176,3 +195,73 @@ def test_v2_polars_return_type(): assert isinstance(polars_df, pl.DataFrame) assert polars_df.height == 2 assert polars_df.width == 1 + + +def test_v2_polars_parquet(): + polars_df = TEST_SINGLETON.ctx.foundry_sql_server_v2.query_foundry_sql( + f"SELECT sepal_length FROM `{TEST_SINGLETON.iris_parquet.rid}` LIMIT 2", + return_type="polars", + ) + assert isinstance(polars_df, pl.DataFrame) + assert polars_df.height == 2 + assert polars_df.width == 1 + + polars_df = TEST_SINGLETON.ctx.foundry_sql_server_v2.query_foundry_sql( + f"SELECT sepal_length FROM `{TEST_SINGLETON.iris_parquet.rid}` LIMIT 2", + return_type="polars", + experimental_use_trino=True, + ) + assert isinstance(polars_df, pl.DataFrame) + assert polars_df.height == 2 + assert polars_df.width == 1 + + +def test_v2_polars_parquet_hive_partitioning(): + polars_df = TEST_SINGLETON.ctx.foundry_sql_server_v2.query_foundry_sql( + f"SELECT sepal_length FROM `{TEST_SINGLETON.iris_hive_partitioned.rid}` LIMIT 2", + return_type="polars", + experimental_use_trino=True, + ) + assert isinstance(polars_df, pl.DataFrame) + assert polars_df.height == 2 + assert polars_df.width == 1 + + polars_df = TEST_SINGLETON.ctx.foundry_sql_server_v2.query_foundry_sql( + f"SELECT sepal_length FROM `{TEST_SINGLETON.iris_hive_partitioned.rid}` LIMIT 2", return_type="polars" + ) + assert isinstance(polars_df, pl.DataFrame) + assert polars_df.height == 2 + assert polars_df.width == 1 + + +def test_v2_arrow_compression_codecs(): + """Test V2 client with different arrow compression codecs.""" + # Test with LZ4 compression + result_lz4 = TEST_SINGLETON.ctx.foundry_sql_server_v2.query_foundry_sql( + query=f"SELECT * FROM `{TEST_SINGLETON.iris_new.rid}` LIMIT 10", + arrow_compression_codec="LZ4", + ) + assert result_lz4.shape[0] == 10 + assert result_lz4.shape[1] == 5 + + # Test with ZSTD compression + result_zstd = TEST_SINGLETON.ctx.foundry_sql_server_v2.query_foundry_sql( + query=f"SELECT * FROM `{TEST_SINGLETON.iris_new.rid}` LIMIT 10", + arrow_compression_codec="ZSTD", + ) + assert result_zstd.shape[0] == 10 + assert result_zstd.shape[1] == 5 + + # Test with NONE compression (default) + result_none = TEST_SINGLETON.ctx.foundry_sql_server_v2.query_foundry_sql( + query=f"SELECT * FROM `{TEST_SINGLETON.iris_new.rid}` LIMIT 10", + arrow_compression_codec="NONE", + ) + assert result_none.shape[0] == 10 + assert result_none.shape[1] == 5 + + # Verify all results have the same data + import pandas as pd + + pd.testing.assert_frame_equal(result_lz4, result_zstd) + pd.testing.assert_frame_equal(result_lz4, result_none) diff --git a/tests/unit/clients/test_foundry_sql_server.py b/tests/unit/clients/test_foundry_sql_server.py index 6f0da5cf..b8c6ee8a 100644 --- a/tests/unit/clients/test_foundry_sql_server.py +++ b/tests/unit/clients/test_foundry_sql_server.py @@ -121,3 +121,106 @@ def test_exception_unknown_json(mocker, test_context_mock): timeout=0.001, ) assert exception.value.error_message == "" + + +def test_v2_experimental_use_trino(mocker, test_context_mock): + """Test that experimental_use_trino parameter modifies the query correctly.""" + import pandas as pd + + mocker.patch("time.sleep") # we do not want to wait in tests + + # Mock the arrow stream reader to return a simple pandas DataFrame + mock_arrow_reader = mocker.MagicMock() + mock_arrow_reader.read_pandas.return_value = pd.DataFrame({"col1": [1, 2, 3]}) + mocker.patch.object( + test_context_mock.foundry_sql_server_v2, + "read_stream_results_arrow", + return_value=mock_arrow_reader, + ) + + # Mock the api_query endpoint (initial query execution) + query_matcher = mocker.MagicMock() + test_context_mock.mock_adapter.register_uri( + "POST", + build_api_url(TEST_HOST.url, "foundry-sql-server", "sql-endpoint/v1/queries/query"), + json={"type": "running", "running": {"queryHandle": {"queryId": "test-query-id-123", "type": "foundry"}}}, + additional_matcher=query_matcher, + ) + + # Mock the api_status endpoint (poll for completion - returns ready immediately) + test_context_mock.mock_adapter.register_uri( + "POST", + build_api_url(TEST_HOST.url, "foundry-sql-server", "sql-endpoint/v1/queries/status"), + json={ + "status": { + "type": "ready", + "ready": {"tickets": [{"tickets": ["eyJhbGc...mock-ticket-1", "eyJhbGc...mock-ticket-2"]}]}, + } + }, + ) + + # Test with experimental_use_trino=True + df = test_context_mock.foundry_sql_server_v2.query_foundry_sql( + "SELECT * FROM `ri.foundry.main.dataset.test-dataset`", + experimental_use_trino=True, + ) + + # Verify the query was modified to include the Trino backend hint + call_args = query_matcher.call_args_list[0] + request = call_args[0][0] + request_json = request.json() + + assert "SELECT /*+ backend(trino) */ * FROM" in request_json["querySpec"]["query"] + assert df.shape[0] == 3 + + # Reset for second test + query_matcher.reset_mock() + + # Test with experimental_use_trino=False (default) + df = test_context_mock.foundry_sql_server_v2.query_foundry_sql( + "SELECT * FROM `ri.foundry.main.dataset.test-dataset`", + experimental_use_trino=False, + ) + + # Verify the query was NOT modified + call_args = query_matcher.call_args_list[0] + request = call_args[0][0] + request_json = request.json() + + assert request_json["querySpec"]["query"] == "SELECT * FROM `ri.foundry.main.dataset.test-dataset`" + assert "backend(trino)" not in request_json["querySpec"]["query"] + assert df.shape[0] == 3 + + +def test_v2_poll_for_query_completion_timeout(mocker, test_context_mock): + """Test that V2 query times out correctly when polling takes too long.""" + mocker.patch("time.sleep") # we do not want to wait in tests + + # Mock the api_query endpoint (initial query execution) + test_context_mock.mock_adapter.register_uri( + "POST", + build_api_url(TEST_HOST.url, "foundry-sql-server", "sql-endpoint/v1/queries/query"), + json={"type": "running", "running": {"queryHandle": {"queryId": "test-query-timeout-123", "type": "foundry"}}}, + ) + + # Mock the api_status endpoint to always return running status + test_context_mock.mock_adapter.register_uri( + "POST", + build_api_url(TEST_HOST.url, "foundry-sql-server", "sql-endpoint/v1/queries/status"), + json={"status": {"type": "running", "running": {}}}, + ) + + with pytest.raises(FoundrySqlQueryClientTimedOutError): + test_context_mock.foundry_sql_server_v2.query_foundry_sql( + "SELECT * FROM `ri.foundry.main.dataset.test-dataset`", + timeout=0.001, + ) + + +def test_v2_ansi_dialect_not_supported(test_context_mock): + """Test that V2 client rejects ANSI SQL dialect.""" + with pytest.raises(TypeError, match="'ANSI' is not a valid option for sql_dialect"): + test_context_mock.foundry_sql_server_v2.query_foundry_sql( + "SELECT * FROM `ri.foundry.main.dataset.test-dataset`", + sql_dialect="ANSI", # type: ignore[arg-type] + ) From 5758e14918c454a57936dcb7834476c70f1586a0 Mon Sep 17 00:00:00 2001 From: nicornk Date: Fri, 20 Feb 2026 10:52:43 +0100 Subject: [PATCH 07/12] remove logging --- .../src/foundry_dev_tools/clients/foundry_sql_server.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/libs/foundry-dev-tools/src/foundry_dev_tools/clients/foundry_sql_server.py b/libs/foundry-dev-tools/src/foundry_dev_tools/clients/foundry_sql_server.py index f613e775..6b26aa3e 100644 --- a/libs/foundry-dev-tools/src/foundry_dev_tools/clients/foundry_sql_server.py +++ b/libs/foundry-dev-tools/src/foundry_dev_tools/clients/foundry_sql_server.py @@ -2,7 +2,6 @@ from __future__ import annotations -import logging import time import warnings from typing import TYPE_CHECKING, Any, Literal, overload @@ -30,8 +29,6 @@ import pyspark import requests -LOGGER = logging.getLogger(__name__) - class FoundrySqlServerClient(APIClient): """FoundrySqlServerClient class that implements methods from the 'foundry-sql-server' API.""" From ce90696c4d90f1373ba8a518492f7ffb9b58d41d Mon Sep 17 00:00:00 2001 From: nicornk Date: Fri, 20 Feb 2026 10:57:26 +0100 Subject: [PATCH 08/12] fix copilot comments --- .../clients/foundry_sql_server.py | 30 +++++++++++++------ .../clients/test_foundry_sql_server.py | 6 ++-- tests/unit/clients/test_foundry_sql_server.py | 11 ++++++- 3 files changed, 34 insertions(+), 13 deletions(-) diff --git a/libs/foundry-dev-tools/src/foundry_dev_tools/clients/foundry_sql_server.py b/libs/foundry-dev-tools/src/foundry_dev_tools/clients/foundry_sql_server.py index 6b26aa3e..2e7975a0 100644 --- a/libs/foundry-dev-tools/src/foundry_dev_tools/clients/foundry_sql_server.py +++ b/libs/foundry-dev-tools/src/foundry_dev_tools/clients/foundry_sql_server.py @@ -368,24 +368,24 @@ def query_foundry_sql( def query_foundry_sql( self, query: str, - return_type: SQLReturnType = ..., + return_type: Literal["pandas", "polars", "spark", "arrow"] = ..., branch: Ref = ..., sql_dialect: FurnaceSqlDialect = ..., arrow_compression_codec: ArrowCompressionCodec = ..., timeout: int = ..., experimental_use_trino: bool = ..., - ) -> tuple[dict, list[list]] | pd.core.frame.DataFrame | pl.DataFrame | pa.Table | pyspark.sql.DataFrame: ... + ) -> pd.core.frame.DataFrame | pl.DataFrame | pa.Table | pyspark.sql.DataFrame: ... def query_foundry_sql( self, query: str, - return_type: SQLReturnType = "pandas", + return_type: Literal["pandas", "polars", "spark", "arrow"] = "pandas", branch: Ref = "master", sql_dialect: FurnaceSqlDialect = "SPARK", arrow_compression_codec: ArrowCompressionCodec = "NONE", timeout: int = 600, experimental_use_trino: bool = False, - ) -> tuple[dict, list[list]] | pd.core.frame.DataFrame | pl.DataFrame | pa.Table | pyspark.sql.DataFrame: + ) -> pd.core.frame.DataFrame | pl.DataFrame | pa.Table | pyspark.sql.DataFrame: """Queries the Foundry SQL server using the V2 API. Uses Arrow IPC to communicate with the Foundry SQL Server Endpoint. @@ -397,7 +397,7 @@ def query_foundry_sql( Args: query: The SQL Query - return_type: See :py:class:foundry_dev_tools.foundry_api_client.SQLReturnType + return_type: The return type (pandas, polars, spark, or arrow). Note: "raw" is not supported in V2. branch: The dataset branch to query sql_dialect: The SQL dialect to use (only SPARK is supported for V2) arrow_compression_codec: Arrow compression codec (NONE, LZ4, ZSTD) @@ -414,13 +414,15 @@ def query_foundry_sql( FoundrySqlQueryClientTimedOutError: If the query times out """ # noqa: E501 - assert_in_literal(sql_dialect, FurnaceSqlDialect, "sql_dialect") - if experimental_use_trino: query = query.replace("SELECT ", "SELECT /*+ backend(trino) */ ", 1) response_json = self.api_query( - query=query, dialect=sql_dialect, branch=branch, arrow_compression_codec=arrow_compression_codec + query=query, + dialect=sql_dialect, + branch=branch, + arrow_compression_codec=arrow_compression_codec, + timeout=timeout, ).json() query_handle = self._extract_query_handle(response_json) @@ -466,7 +468,11 @@ def query_foundry_sql( if return_type == "arrow": return arrow_stream_reader.read_all() - raise ValueError("The following return_type is not supported: " + return_type) + msg = ( + f"Unsupported return_type: {return_type}. " + f"V2 API supports: pandas, polars, spark, arrow (raw is not supported)" + ) + raise ValueError(msg) def _extract_query_handle(self, response_json: dict[str, Any]) -> dict[str, Any]: """Extract query handle from execute response. @@ -526,6 +532,7 @@ def api_query( dialect: FurnaceSqlDialect, branch: Ref, arrow_compression_codec: ArrowCompressionCodec = "NONE", + timeout: int = 600, **kwargs, ) -> requests.Response: """Execute a SQL query via the V2 API. @@ -535,12 +542,16 @@ def api_query( dialect: The SQL dialect to use (only SPARK is supported) branch: The dataset branch to query arrow_compression_codec: Arrow compression codec (NONE, LZ4, ZSTD) + timeout: Query timeout in seconds (used for error context) **kwargs: gets passed to :py:meth:`APIClient.api_request` Returns: Response with query handle and initial status """ + assert_in_literal(dialect, FurnaceSqlDialect, "dialect") + assert_in_literal(arrow_compression_codec, ArrowCompressionCodec, "arrow_compression_codec") + return self.api_request( "POST", "sql-endpoint/v1/queries/query", @@ -557,6 +568,7 @@ def api_query( "resultMode": "AUTO", }, }, + error_handling=ErrorHandlingConfig(branch=branch, dialect=dialect, timeout=timeout), **kwargs, ) diff --git a/tests/integration/clients/test_foundry_sql_server.py b/tests/integration/clients/test_foundry_sql_server.py index 98135734..d4a7b99a 100644 --- a/tests/integration/clients/test_foundry_sql_server.py +++ b/tests/integration/clients/test_foundry_sql_server.py @@ -134,10 +134,10 @@ def test_v2_return_type_arrow(): def test_v2_return_type_raw_not_supported(): """Test V2 client with raw return type.""" - with pytest.raises(ValueError, match="The following return_type is not supported: .+"): - schema, rows = TEST_SINGLETON.ctx.foundry_sql_server_v2.query_foundry_sql( + with pytest.raises(ValueError, match="Unsupported return_type: raw"): + TEST_SINGLETON.ctx.foundry_sql_server_v2.query_foundry_sql( query=f"SELECT sepal_width, sepal_length FROM `{TEST_SINGLETON.iris_new.rid}` LIMIT 3", - return_type="raw", + return_type="raw", # type: ignore[arg-type] ) diff --git a/tests/unit/clients/test_foundry_sql_server.py b/tests/unit/clients/test_foundry_sql_server.py index b8c6ee8a..54c78993 100644 --- a/tests/unit/clients/test_foundry_sql_server.py +++ b/tests/unit/clients/test_foundry_sql_server.py @@ -219,8 +219,17 @@ def test_v2_poll_for_query_completion_timeout(mocker, test_context_mock): def test_v2_ansi_dialect_not_supported(test_context_mock): """Test that V2 client rejects ANSI SQL dialect.""" - with pytest.raises(TypeError, match="'ANSI' is not a valid option for sql_dialect"): + with pytest.raises(TypeError, match="'ANSI' is not a valid option for dialect"): test_context_mock.foundry_sql_server_v2.query_foundry_sql( "SELECT * FROM `ri.foundry.main.dataset.test-dataset`", sql_dialect="ANSI", # type: ignore[arg-type] ) + + +def test_v2_invalid_compression_codec(test_context_mock): + """Test that V2 client rejects invalid arrow compression codec.""" + with pytest.raises(TypeError, match="'INVALID' is not a valid option for arrow_compression_codec"): + test_context_mock.foundry_sql_server_v2.query_foundry_sql( + "SELECT * FROM `ri.foundry.main.dataset.test-dataset`", + arrow_compression_codec="INVALID", # type: ignore[arg-type] + ) From d8d6ac1d87411cafde620a664aa5367ed636804c Mon Sep 17 00:00:00 2001 From: nicornk Date: Fri, 20 Feb 2026 13:42:07 +0100 Subject: [PATCH 09/12] fix co-pilot comments --- .../clients/foundry_sql_server.py | 54 ++++++++++++++----- .../clients/test_foundry_sql_server.py | 19 +++++++ 2 files changed, 59 insertions(+), 14 deletions(-) diff --git a/libs/foundry-dev-tools/src/foundry_dev_tools/clients/foundry_sql_server.py b/libs/foundry-dev-tools/src/foundry_dev_tools/clients/foundry_sql_server.py index 2e7975a0..7cba7c96 100644 --- a/libs/foundry-dev-tools/src/foundry_dev_tools/clients/foundry_sql_server.py +++ b/libs/foundry-dev-tools/src/foundry_dev_tools/clients/foundry_sql_server.py @@ -412,10 +412,15 @@ def query_foundry_sql( Raises: FoundrySqlQueryFailedError: If the query fails FoundrySqlQueryClientTimedOutError: If the query times out + TypeError: If an invalid sql_dialect or arrow_compression_codec is provided + ValueError: If an unsupported return_type is provided """ # noqa: E501 if experimental_use_trino: - query = query.replace("SELECT ", "SELECT /*+ backend(trino) */ ", 1) + # Case-insensitive replacement of first SELECT keyword + import re + + query = re.sub(r"\bSELECT\b", "SELECT /*+ backend(trino) */", query, count=1, flags=re.IGNORECASE) response_json = self.api_query( query=query, @@ -428,7 +433,6 @@ def query_foundry_sql( query_handle = self._extract_query_handle(response_json) start_time = time.time() - # Poll for completion while response_json.get("status", {}).get("type") != "ready": time.sleep(0.2) response = self.api_status(query_handle) @@ -439,20 +443,14 @@ def query_foundry_sql( if time.time() > start_time + timeout: raise FoundrySqlQueryClientTimedOutError(response, timeout=timeout) - # Extract tickets from successful response ticket = self._extract_ticket(response_json) - # Fetch Arrow data using tickets arrow_stream_reader = self.read_stream_results_arrow(ticket) if return_type == "pandas": return arrow_stream_reader.read_pandas() if return_type == "polars": - # The FakeModule implementation used in the _optional packages - # throws an ImportError when trying to access attributes of the module. - # This ImportError is caught below to fall back to query_foundry_sql_legacy - # which will again raise an ImportError when polars is not installed. from foundry_dev_tools._optional.polars import pl arrow_table = arrow_stream_reader.read_all() @@ -484,8 +482,26 @@ def _extract_query_handle(self, response_json: dict[str, Any]) -> dict[str, Any] Returns: Query handle dict + Raises: + KeyError: If the response JSON doesn't contain the expected structure + """ - return response_json[response_json["type"]]["queryHandle"] + response_type = response_json.get("type") + if not response_type: + msg = f"Response JSON missing 'type' field. Response: {response_json}" + raise KeyError(msg) + + type_data = response_json.get(response_type) + if not type_data: + msg = f"Response JSON missing '{response_type}' field. Response: {response_json}" + raise KeyError(msg) + + query_handle = type_data.get("queryHandle") + if not query_handle: + msg = f"Response JSON missing 'queryHandle' in '{response_type}'. Response: {response_json}" + raise KeyError(msg) + + return query_handle def _extract_ticket(self, response_json: dict[str, Any]) -> dict[str, Any]: """Extract tickets from success response. @@ -496,16 +512,26 @@ def _extract_ticket(self, response_json: dict[str, Any]) -> dict[str, Any]: Returns: List of tickets for fetching results + Raises: + KeyError: If the response JSON doesn't contain the expected structure + """ + try: + status = response_json["status"] + ready = status["ready"] + ticket_groups = ready["tickets"] + except KeyError as exc: + msg = ( + f"Response JSON missing expected structure. " + f"Expected path: status.ready.tickets. Response: {response_json}" + ) + raise KeyError(msg) from exc + # we combine all tickets into one to get the full data # if performance is a concern this should be done in parallel return { "id": 0, - "tickets": [ - ticket - for ticket_group in response_json["status"]["ready"]["tickets"] - for ticket in ticket_group["tickets"] - ], + "tickets": [ticket for ticket_group in ticket_groups for ticket in ticket_group["tickets"]], "type": "furnace", } diff --git a/tests/integration/clients/test_foundry_sql_server.py b/tests/integration/clients/test_foundry_sql_server.py index d4a7b99a..2cdd6bec 100644 --- a/tests/integration/clients/test_foundry_sql_server.py +++ b/tests/integration/clients/test_foundry_sql_server.py @@ -265,3 +265,22 @@ def test_v2_arrow_compression_codecs(): pd.testing.assert_frame_equal(result_lz4, result_zstd) pd.testing.assert_frame_equal(result_lz4, result_none) + + +def test_v2_trino_engine_in_response(mocker): + """Test that when experimental_use_trino=True, the API response indicates trino engine.""" + # Spy on the api_query method to capture the initial response + api_query_spy = mocker.spy(TEST_SINGLETON.ctx.foundry_sql_server_v2, "api_query") + + # Execute query with trino enabled using parquet dataset (trino works with parquet) + result = TEST_SINGLETON.ctx.foundry_sql_server_v2.query_foundry_sql( + query=f"SELECT sepal_length FROM `{TEST_SINGLETON.iris_parquet.rid}` LIMIT 1", + experimental_use_trino=True, + ) + + assert result.shape == (1, 1) + + # Verify the API response indicates TRINO backend + response_json = api_query_spy.spy_return.json() + backend = response_json[response_json["type"]]["queryStructure"]["metadata"]["backend"] + assert backend == "TRINO" From 3b1dca079d917629dc8136b44fbbb7808e5562f5 Mon Sep 17 00:00:00 2001 From: nicornk Date: Fri, 20 Feb 2026 13:54:30 +0100 Subject: [PATCH 10/12] docstring --- .../src/foundry_dev_tools/clients/foundry_sql_server.py | 8 ++++---- tests/integration/clients/test_foundry_sql_server.py | 1 + 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/libs/foundry-dev-tools/src/foundry_dev_tools/clients/foundry_sql_server.py b/libs/foundry-dev-tools/src/foundry_dev_tools/clients/foundry_sql_server.py index 7cba7c96..db80b629 100644 --- a/libs/foundry-dev-tools/src/foundry_dev_tools/clients/foundry_sql_server.py +++ b/libs/foundry-dev-tools/src/foundry_dev_tools/clients/foundry_sql_server.py @@ -326,7 +326,7 @@ def query_foundry_sql( arrow_compression_codec: ArrowCompressionCodec = ..., timeout: int = ..., experimental_use_trino: bool = ..., - ) -> pd.core.frame.DataFrame: ... + ) -> pd.DataFrame: ... @overload def query_foundry_sql( @@ -374,7 +374,7 @@ def query_foundry_sql( arrow_compression_codec: ArrowCompressionCodec = ..., timeout: int = ..., experimental_use_trino: bool = ..., - ) -> pd.core.frame.DataFrame | pl.DataFrame | pa.Table | pyspark.sql.DataFrame: ... + ) -> pd.DataFrame | pl.DataFrame | pa.Table | pyspark.sql.DataFrame: ... def query_foundry_sql( self, @@ -385,7 +385,7 @@ def query_foundry_sql( arrow_compression_codec: ArrowCompressionCodec = "NONE", timeout: int = 600, experimental_use_trino: bool = False, - ) -> pd.core.frame.DataFrame | pl.DataFrame | pa.Table | pyspark.sql.DataFrame: + ) -> pd.DataFrame | pl.DataFrame | pa.Table | pyspark.sql.DataFrame: """Queries the Foundry SQL server using the V2 API. Uses Arrow IPC to communicate with the Foundry SQL Server Endpoint. @@ -510,7 +510,7 @@ def _extract_ticket(self, response_json: dict[str, Any]) -> dict[str, Any]: response_json: Success response JSON from status API Returns: - List of tickets for fetching results + Ticket dict with id, tickets list, and type. Example: {"id": 0, "tickets": [...], "type": "furnace"} Raises: KeyError: If the response JSON doesn't contain the expected structure diff --git a/tests/integration/clients/test_foundry_sql_server.py b/tests/integration/clients/test_foundry_sql_server.py index 2cdd6bec..f1b22f1c 100644 --- a/tests/integration/clients/test_foundry_sql_server.py +++ b/tests/integration/clients/test_foundry_sql_server.py @@ -168,6 +168,7 @@ def test_v2_disable_arrow_compression(): """Test V2 client with arrow compression disabled.""" result = TEST_SINGLETON.ctx.foundry_sql_server_v2.query_foundry_sql( query=f"SELECT * FROM `{TEST_SINGLETON.iris_new.rid}` LIMIT 5", + arrow_compression_codec="NONE", ) assert result.shape[0] == 5 From c07b31b7e3b5e2fddebd6bc2c32f87a5af022bb7 Mon Sep 17 00:00:00 2001 From: nicornk Date: Fri, 20 Feb 2026 15:26:55 +0100 Subject: [PATCH 11/12] improve error logging --- .../src/foundry_dev_tools/errors/sql.py | 38 +++++++++- tests/unit/clients/test_foundry_sql_server.py | 70 +++++++++++++++++++ 2 files changed, 106 insertions(+), 2 deletions(-) diff --git a/libs/foundry-dev-tools/src/foundry_dev_tools/errors/sql.py b/libs/foundry-dev-tools/src/foundry_dev_tools/errors/sql.py index 567cd5d9..82c054af 100644 --- a/libs/foundry-dev-tools/src/foundry_dev_tools/errors/sql.py +++ b/libs/foundry-dev-tools/src/foundry_dev_tools/errors/sql.py @@ -5,6 +5,7 @@ from typing import TYPE_CHECKING from foundry_dev_tools.errors.meta import FoundryAPIError +from foundry_dev_tools.utils.misc import decamelize if TYPE_CHECKING: import requests @@ -16,8 +17,41 @@ class FoundrySqlQueryFailedError(FoundryAPIError): message = "Foundry SQL Query Failed." def __init__(self, response: requests.Response): - self.error_message = response.json().get("status", {}).get("failed", {}).get("errorMessage", "") - super().__init__(response=response, info=self.error_message) + kwargs = {} + info = "" + + try: + response_json = response.json() + failed_data = response_json.get("status", {}).get("failed", {}) + + # Try to extract V2 error structure with rich parameters + if error_code := failed_data.get("errorCode"): + kwargs["error_code"] = error_code + if error_name := failed_data.get("errorName"): + kwargs["error_name"] = error_name + if error_instance_id := failed_data.get("errorInstanceId"): + kwargs["error_instance_id"] = error_instance_id + + # Extract all parameters and convert camelCase to snake_case + if parameters := failed_data.get("parameters"): + for key, value in parameters.items(): + kwargs[decamelize(key)] = value + + # Prefer userFriendlyMessage as the info text + info = parameters.get("userFriendlyMessage", "") + + # Fall back to V1 errorMessage if userFriendlyMessage not available + if not info: + info = failed_data.get("errorMessage", "") + + # Store legacy error_message attribute for backward compatibility + self.error_message = info + + except Exception: # noqa: BLE001 + # If any error occurs during extraction, fall back to empty + self.error_message = "" + + super().__init__(response=response, info=info, **kwargs) class FurnaceSqlSqlParseError(FoundryAPIError): diff --git a/tests/unit/clients/test_foundry_sql_server.py b/tests/unit/clients/test_foundry_sql_server.py index 54c78993..b93c2ef0 100644 --- a/tests/unit/clients/test_foundry_sql_server.py +++ b/tests/unit/clients/test_foundry_sql_server.py @@ -233,3 +233,73 @@ def test_v2_invalid_compression_codec(test_context_mock): "SELECT * FROM `ri.foundry.main.dataset.test-dataset`", arrow_compression_codec="INVALID", # type: ignore[arg-type] ) + + +def test_v2_query_failed_error_details(mocker, test_context_mock): + """Test that V2 error responses with rich parameters are properly extracted.""" + mocker.patch("time.sleep") + + # Mock the api_query endpoint (initial query execution) + test_context_mock.mock_adapter.register_uri( + "POST", + build_api_url(TEST_HOST.url, "foundry-sql-server", "sql-endpoint/v1/queries/query"), + json={"type": "running", "running": {"queryHandle": {"queryId": "test-query-id", "type": "foundry"}}}, + ) + + # Mock the api_status endpoint with V2 error structure containing rich parameters + test_context_mock.mock_adapter.register_uri( + "POST", + build_api_url(TEST_HOST.url, "foundry-sql-server", "sql-endpoint/v1/queries/status"), + json={ + "status": { + "type": "failed", + "failed": { + "errorCode": "INVALID_ARGUMENT", + "errorName": "SqlQueryService:SqlSyntaxError", + "errorInstanceId": "c16cb2b7-01ec-42a9-9ee2-0e57e2aed4ba", + "parameters": { + "endLine": 1, + "endColumn": 15350, + "dialect": "SPARK", + "queryFragment": "", + "startColumn": 15340, + "startLine": 1, + "userFriendlyMessage": ( + "From line 1, column 15340 to line 1, column 15350: " + "Column 'COLUMN_NAME' not found in table 'my_table'; did you mean 'column_name'?" + ), + }, + }, + } + }, + ) + + with pytest.raises(FoundrySqlQueryFailedError) as exception: + test_context_mock.foundry_sql_server_v2.query_foundry_sql( + "SELECT COLUMN_NAME FROM `ri.foundry.main.dataset.test-dataset`", + ) + + # Verify all error parameters are extracted and accessible + assert exception.value.error_code == "INVALID_ARGUMENT" + assert exception.value.error_name == "SqlQueryService:SqlSyntaxError" + assert exception.value.error_instance_id == "c16cb2b7-01ec-42a9-9ee2-0e57e2aed4ba" + + # Verify parameters are converted from camelCase to snake_case and accessible + assert exception.value.start_line == 1 + assert exception.value.end_line == 1 + assert exception.value.start_column == 15340 + assert exception.value.end_column == 15350 + assert exception.value.dialect == "SPARK" + # query_fragment is in kwargs even if empty + assert "query_fragment" in exception.value.kwargs + + # Verify userFriendlyMessage is used as the info text and accessible + assert exception.value.user_friendly_message == ( + "From line 1, column 15340 to line 1, column 15350: " + "Column 'COLUMN_NAME' not found in table 'my_table'; did you mean 'column_name'?" + ) + + # Verify the exception message string includes the user-friendly message + exception_str = str(exception.value) + assert "COLUMN_NAME" in exception_str + assert "my_table" in exception_str From 6eb0de2e2579f5661bcfb0e388ae11dd8a39d832 Mon Sep 17 00:00:00 2001 From: nicornk Date: Fri, 20 Feb 2026 15:55:43 +0100 Subject: [PATCH 12/12] improve exception context --- .../clients/foundry_sql_server.py | 2 +- .../src/foundry_dev_tools/errors/sql.py | 5 +- tests/unit/clients/test_foundry_sql_server.py | 51 +++++++++++++++++++ 3 files changed, 56 insertions(+), 2 deletions(-) diff --git a/libs/foundry-dev-tools/src/foundry_dev_tools/clients/foundry_sql_server.py b/libs/foundry-dev-tools/src/foundry_dev_tools/clients/foundry_sql_server.py index db80b629..699f4bda 100644 --- a/libs/foundry-dev-tools/src/foundry_dev_tools/clients/foundry_sql_server.py +++ b/libs/foundry-dev-tools/src/foundry_dev_tools/clients/foundry_sql_server.py @@ -439,7 +439,7 @@ def query_foundry_sql( response_json = response.json() if response_json.get("status", {}).get("type") == "failed": - raise FoundrySqlQueryFailedError(response) + raise FoundrySqlQueryFailedError(response, query=query, branch=branch, dialect=sql_dialect) if time.time() > start_time + timeout: raise FoundrySqlQueryClientTimedOutError(response, timeout=timeout) diff --git a/libs/foundry-dev-tools/src/foundry_dev_tools/errors/sql.py b/libs/foundry-dev-tools/src/foundry_dev_tools/errors/sql.py index 82c054af..dd5e89f4 100644 --- a/libs/foundry-dev-tools/src/foundry_dev_tools/errors/sql.py +++ b/libs/foundry-dev-tools/src/foundry_dev_tools/errors/sql.py @@ -16,7 +16,7 @@ class FoundrySqlQueryFailedError(FoundryAPIError): message = "Foundry SQL Query Failed." - def __init__(self, response: requests.Response): + def __init__(self, response: requests.Response, **context_kwargs): kwargs = {} info = "" @@ -51,6 +51,9 @@ def __init__(self, response: requests.Response): # If any error occurs during extraction, fall back to empty self.error_message = "" + # Merge context kwargs (e.g., query, branch) with extracted error parameters + kwargs.update(context_kwargs) + super().__init__(response=response, info=info, **kwargs) diff --git a/tests/unit/clients/test_foundry_sql_server.py b/tests/unit/clients/test_foundry_sql_server.py index b93c2ef0..f3c1d0be 100644 --- a/tests/unit/clients/test_foundry_sql_server.py +++ b/tests/unit/clients/test_foundry_sql_server.py @@ -303,3 +303,54 @@ def test_v2_query_failed_error_details(mocker, test_context_mock): exception_str = str(exception.value) assert "COLUMN_NAME" in exception_str assert "my_table" in exception_str + + +def test_v2_polling_error_includes_context(mocker, test_context_mock): + """Test that polling errors include query context for better debugging.""" + mocker.patch("time.sleep") + + test_query = "SELECT * FROM `ri.foundry.main.dataset.test-dataset`" + + # Mock the api_query endpoint (initial query execution) + test_context_mock.mock_adapter.register_uri( + "POST", + build_api_url(TEST_HOST.url, "foundry-sql-server", "sql-endpoint/v1/queries/query"), + json={"type": "running", "running": {"queryHandle": {"queryId": "test-query-id", "type": "foundry"}}}, + ) + + # Mock the api_status endpoint with polling error + test_context_mock.mock_adapter.register_uri( + "POST", + build_api_url(TEST_HOST.url, "foundry-sql-server", "sql-endpoint/v1/queries/status"), + json={ + "status": { + "type": "failed", + "failed": { + "errorCode": "ModuleGroupService:ErrorPollingModule", + "errorInstanceId": "5be87070-3aa3-4ed6-aa6a-d9b5041885af", + "errorMessage": "Error polling for job status. Please resubmit.", + "retryable": False, + }, + } + }, + ) + + with pytest.raises(FoundrySqlQueryFailedError) as exception: + test_context_mock.foundry_sql_server_v2.query_foundry_sql(test_query) + + # Verify error details are extracted + assert exception.value.error_code == "ModuleGroupService:ErrorPollingModule" + assert exception.value.error_instance_id == "5be87070-3aa3-4ed6-aa6a-d9b5041885af" + assert exception.value.error_message == "Error polling for job status. Please resubmit." + + # Verify query context is included in the error + assert exception.value.query == test_query + assert exception.value.branch == "master" + assert exception.value.dialect == "SPARK" + + # Verify context appears in exception string + exception_str = str(exception.value) + assert "query = " + test_query in exception_str + assert "branch = master" in exception_str + assert "dialect = SPARK" in exception_str + assert "ModuleGroupService:ErrorPollingModule" in exception_str