From cc0249c3128f0a22aba5c2a43327ddd3061c688c Mon Sep 17 00:00:00 2001 From: ChengHui Chen <27797326+chenghuichen@users.noreply.github.com> Date: Tue, 27 Jan 2026 11:29:36 +0800 Subject: [PATCH 1/5] support datatype of ROW/STRUCT --- .../java/org/apache/paimon/JavaPyE2ETest.java | 190 +++++++++--------- paimon-python/pypaimon/schema/data_types.py | 17 ++ .../pypaimon/tests/data_types_test.py | 71 ++++++- .../tests/e2e/java_py_read_write_test.py | 40 +++- 4 files changed, 221 insertions(+), 97 deletions(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java index ef9ba888fced..4bf02ec931a1 100644 --- a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java +++ b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java @@ -103,79 +103,6 @@ public void before() throws Exception { } } - @Test - @EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true") - public void testJavaWriteReadAppendTable() throws Exception { - for (String format : Arrays.asList("parquet", "orc", "avro")) { - Identifier identifier = identifier("mixed_test_append_tablej_" + format); - Schema schema = - Schema.newBuilder() - .column("id", DataTypes.INT()) - .column("name", DataTypes.STRING()) - .column("category", DataTypes.STRING()) - .column("value", DataTypes.DOUBLE()) - .column("ts", DataTypes.TIMESTAMP()) - .column("ts_ltz", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()) - .partitionKeys("category") - .option("dynamic-partition-overwrite", "false") - .option("file.format", format) - .build(); - - catalog.createTable(identifier, schema, true); - Table table = catalog.getTable(identifier); - FileStoreTable fileStoreTable = (FileStoreTable) table; - - try (StreamTableWrite write = fileStoreTable.newWrite(commitUser); - InnerTableCommit commit = fileStoreTable.newCommit(commitUser)) { - - write.write(createRow6Cols(1, "Apple", "Fruit", 1.5, 1000000L, 2000000L)); - write.write(createRow6Cols(2, "Banana", "Fruit", 0.8, 1000001L, 2000001L)); - write.write(createRow6Cols(3, "Carrot", "Vegetable", 0.6, 1000002L, 2000002L)); - write.write(createRow6Cols(4, "Broccoli", "Vegetable", 1.2, 1000003L, 2000003L)); - write.write(createRow6Cols(5, "Chicken", "Meat", 5.0, 1000004L, 2000004L)); - write.write(createRow6Cols(6, "Beef", "Meat", 8.0, 1000005L, 2000005L)); - - commit.commit(0, write.prepareCommit(true, 0)); - } - - List splits = - new ArrayList<>(fileStoreTable.newSnapshotReader().read().dataSplits()); - TableRead read = fileStoreTable.newRead(); - List res = - getResult( - read, - splits, - row -> DataFormatTestUtil.toStringNoRowKind(row, table.rowType())); - assertThat(res) - .containsExactlyInAnyOrder( - "1, Apple, Fruit, 1.5, 1970-01-01T00:16:40, 1970-01-01T00:33:20", - "2, Banana, Fruit, 0.8, 1970-01-01T00:16:40.001, 1970-01-01T00:33:20.001", - "3, Carrot, Vegetable, 0.6, 1970-01-01T00:16:40.002, 1970-01-01T00:33:20.002", - "4, Broccoli, Vegetable, 1.2, 1970-01-01T00:16:40.003, 1970-01-01T00:33:20.003", - "5, Chicken, Meat, 5.0, 1970-01-01T00:16:40.004, 1970-01-01T00:33:20.004", - "6, Beef, Meat, 8.0, 1970-01-01T00:16:40.005, 1970-01-01T00:33:20.005"); - } - } - - @Test - @EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true") - public void testReadAppendTable() throws Exception { - for (String format : Arrays.asList("parquet", "orc", "avro")) { - Identifier identifier = identifier("mixed_test_append_tablep_" + format); - Table table = catalog.getTable(identifier); - FileStoreTable fileStoreTable = (FileStoreTable) table; - List splits = - new ArrayList<>(fileStoreTable.newSnapshotReader().read().dataSplits()); - TableRead read = fileStoreTable.newRead(); - List res = - getResult( - read, - splits, - row -> DataFormatTestUtil.toStringNoRowKind(row, table.rowType())); - System.out.println(res); - } - } - @Test @EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true") public void testJavaWriteReadPkTable() throws Exception { @@ -189,6 +116,21 @@ public void testJavaWriteReadPkTable() throws Exception { .column("value", DataTypes.DOUBLE()) .column("ts", DataTypes.TIMESTAMP()) .column("ts_ltz", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()) + .column( + "metadata", + DataTypes.ROW( + DataTypes.FIELD(0, "source", DataTypes.STRING()), + DataTypes.FIELD(1, "created_at", DataTypes.BIGINT()), + DataTypes.FIELD( + 2, + "location", + DataTypes.ROW( + DataTypes.FIELD( + 0, "city", DataTypes.STRING()), + DataTypes.FIELD( + 1, + "country", + DataTypes.STRING()))))) .primaryKey("id") .partitionKeys("category") .option("dynamic-partition-overwrite", "false") @@ -203,12 +145,54 @@ public void testJavaWriteReadPkTable() throws Exception { try (StreamTableWrite write = fileStoreTable.newWrite(commitUser); InnerTableCommit commit = fileStoreTable.newCommit(commitUser)) { - write.write(createRow6Cols(1, "Apple", "Fruit", 1.5, 1000000L, 2000000L)); - write.write(createRow6Cols(2, "Banana", "Fruit", 0.8, 1000001L, 2000001L)); - write.write(createRow6Cols(3, "Carrot", "Vegetable", 0.6, 1000002L, 2000002L)); - write.write(createRow6Cols(4, "Broccoli", "Vegetable", 1.2, 1000003L, 2000003L)); - write.write(createRow6Cols(5, "Chicken", "Meat", 5.0, 1000004L, 2000004L)); - write.write(createRow6Cols(6, "Beef", "Meat", 8.0, 1000005L, 2000005L)); + write.write( + createRow7Cols( + 1, "Apple", "Fruit", 1.5, 1000000L, 2000000L, "store1", 1001L, + "Beijing", "China")); + write.write( + createRow7Cols( + 2, + "Banana", + "Fruit", + 0.8, + 1000001L, + 2000001L, + "store1", + 1002L, + "Shanghai", + "China")); + write.write( + createRow7Cols( + 3, + "Carrot", + "Vegetable", + 0.6, + 1000002L, + 2000002L, + "store2", + 1003L, + "Tokyo", + "Japan")); + write.write( + createRow7Cols( + 4, + "Broccoli", + "Vegetable", + 1.2, + 1000003L, + 2000003L, + "store2", + 1004L, + "Seoul", + "Korea")); + write.write( + createRow7Cols( + 5, "Chicken", "Meat", 5.0, 1000004L, 2000004L, "store3", 1005L, + "NewYork", "USA")); + write.write( + createRow7Cols( + 6, "Beef", "Meat", 8.0, 1000005L, 2000005L, "store3", 1006L, + "London", "UK")); commit.commit(0, write.prepareCommit(true, 0)); } @@ -223,12 +207,12 @@ public void testJavaWriteReadPkTable() throws Exception { row -> DataFormatTestUtil.toStringNoRowKind(row, table.rowType())); assertThat(res) .containsExactlyInAnyOrder( - "1, Apple, Fruit, 1.5, 1970-01-01T00:16:40, 1970-01-01T00:33:20", - "2, Banana, Fruit, 0.8, 1970-01-01T00:16:40.001, 1970-01-01T00:33:20.001", - "3, Carrot, Vegetable, 0.6, 1970-01-01T00:16:40.002, 1970-01-01T00:33:20.002", - "4, Broccoli, Vegetable, 1.2, 1970-01-01T00:16:40.003, 1970-01-01T00:33:20.003", - "5, Chicken, Meat, 5.0, 1970-01-01T00:16:40.004, 1970-01-01T00:33:20.004", - "6, Beef, Meat, 8.0, 1970-01-01T00:16:40.005, 1970-01-01T00:33:20.005"); + "1, Apple, Fruit, 1.5, 1970-01-01T00:16:40, 1970-01-01T00:33:20, +I[store1, 1001, +I[Beijing, China]]", + "2, Banana, Fruit, 0.8, 1970-01-01T00:16:40.001, 1970-01-01T00:33:20.001, +I[store1, 1002, +I[Shanghai, China]]", + "3, Carrot, Vegetable, 0.6, 1970-01-01T00:16:40.002, 1970-01-01T00:33:20.002, +I[store2, 1003, +I[Tokyo, Japan]]", + "4, Broccoli, Vegetable, 1.2, 1970-01-01T00:16:40.003, 1970-01-01T00:33:20.003, +I[store2, 1004, +I[Seoul, Korea]]", + "5, Chicken, Meat, 5.0, 1970-01-01T00:16:40.004, 1970-01-01T00:33:20.004, +I[store3, 1005, +I[NewYork, USA]]", + "6, Beef, Meat, 8.0, 1970-01-01T00:16:40.005, 1970-01-01T00:33:20.005, +I[store3, 1006, +I[London, UK]]"); } } @@ -385,14 +369,17 @@ public void testReadPkTable() throws Exception { assertThat(table.rowType().getFieldTypes().get(4)).isEqualTo(DataTypes.TIMESTAMP()); assertThat(table.rowType().getFieldTypes().get(5)) .isEqualTo(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()); + assertThat(table.rowType().getFieldTypes().get(6)).isInstanceOf(RowType.class); + RowType metadataType = (RowType) table.rowType().getFieldTypes().get(6); + assertThat(metadataType.getFieldTypes().get(2)).isInstanceOf(RowType.class); assertThat(res) .containsExactlyInAnyOrder( - "1, Apple, Fruit, 1.5, 1970-01-01T00:16:40, 1970-01-01T00:33:20", - "2, Banana, Fruit, 0.8, 1970-01-01T00:16:40.001, 1970-01-01T00:33:20.001", - "3, Carrot, Vegetable, 0.6, 1970-01-01T00:16:40.002, 1970-01-01T00:33:20.002", - "4, Broccoli, Vegetable, 1.2, 1970-01-01T00:16:40.003, 1970-01-01T00:33:20.003", - "5, Chicken, Meat, 5.0, 1970-01-01T00:16:40.004, 1970-01-01T00:33:20.004", - "6, Beef, Meat, 8.0, 1970-01-01T00:16:40.005, 1970-01-01T00:33:20.005"); + "1, Apple, Fruit, 1.5, 1970-01-01T00:16:40, 1970-01-01T00:33:20, +I[store1, 1001, +I[Beijing, China]]", + "2, Banana, Fruit, 0.8, 1970-01-01T00:16:40.001, 1970-01-01T00:33:20.001, +I[store1, 1002, +I[Shanghai, China]]", + "3, Carrot, Vegetable, 0.6, 1970-01-01T00:16:40.002, 1970-01-01T00:33:20.002, +I[store2, 1003, +I[Tokyo, Japan]]", + "4, Broccoli, Vegetable, 1.2, 1970-01-01T00:16:40.003, 1970-01-01T00:33:20.003, +I[store2, 1004, +I[Seoul, Korea]]", + "5, Chicken, Meat, 5.0, 1970-01-01T00:16:40.004, 1970-01-01T00:33:20.004, +I[store3, 1005, +I[NewYork, USA]]", + "6, Beef, Meat, 8.0, 1970-01-01T00:16:40.005, 1970-01-01T00:33:20.005, +I[store3, 1006, +I[London, UK]]"); } } @@ -424,15 +411,30 @@ protected FileStoreTable createFileStoreTable(Consumer configure, Path FileIOFinder.find(tablePath), tablePath, tableSchema, CatalogEnvironment.empty()); } - private static InternalRow createRow6Cols( - int id, String name, String category, double value, long ts, long tsLtz) { + private static InternalRow createRow7Cols( + int id, + String name, + String category, + double value, + long ts, + long tsLtz, + String metadataSource, + long metadataCreatedAt, + String city, + String country) { + GenericRow locationRow = + GenericRow.of(BinaryString.fromString(city), BinaryString.fromString(country)); + GenericRow metadataRow = + GenericRow.of( + BinaryString.fromString(metadataSource), metadataCreatedAt, locationRow); return GenericRow.of( id, BinaryString.fromString(name), BinaryString.fromString(category), value, org.apache.paimon.data.Timestamp.fromEpochMillis(ts), - org.apache.paimon.data.Timestamp.fromEpochMillis(tsLtz)); + org.apache.paimon.data.Timestamp.fromEpochMillis(tsLtz), + metadataRow); } protected GenericRow createRow3Cols(Object... values) { diff --git a/paimon-python/pypaimon/schema/data_types.py b/paimon-python/pypaimon/schema/data_types.py index 51787c7dcbb6..318ddfe02fcf 100755 --- a/paimon-python/pypaimon/schema/data_types.py +++ b/paimon-python/pypaimon/schema/data_types.py @@ -472,6 +472,12 @@ def from_paimon_type(data_type: DataType) -> pyarrow.DataType: key_type = PyarrowFieldParser.from_paimon_type(data_type.key) value_type = PyarrowFieldParser.from_paimon_type(data_type.value) return pyarrow.map_(key_type, value_type) + elif isinstance(data_type, RowType): + pa_fields = [] + for field in data_type.fields: + pa_field_type = PyarrowFieldParser.from_paimon_type(field.type) + pa_fields.append(pyarrow.field(field.name, pa_field_type, nullable=field.type.nullable)) + return pyarrow.struct(pa_fields) raise ValueError("Unsupported data type: {}".format(data_type)) @staticmethod @@ -539,6 +545,17 @@ def to_paimon_type(pa_type: pyarrow.DataType, nullable: bool) -> DataType: key_type = PyarrowFieldParser.to_paimon_type(pa_type.key_type, nullable) value_type = PyarrowFieldParser.to_paimon_type(pa_type.item_type, nullable) return MapType(nullable, key_type, value_type) + elif types.is_struct(pa_type): + pa_type: pyarrow.StructType + fields = [] + for i, pa_field in enumerate(pa_type): + field_type = PyarrowFieldParser.to_paimon_type(pa_field.type, pa_field.nullable) + fields.append(DataField( + id=i, + name=pa_field.name, + type=field_type + )) + return RowType(nullable, fields) if type_name is not None: return AtomicType(type_name, nullable) raise ValueError("Unsupported pyarrow type: {}".format(pa_type)) diff --git a/paimon-python/pypaimon/tests/data_types_test.py b/paimon-python/pypaimon/tests/data_types_test.py index 53644e24c571..6ace872997a5 100755 --- a/paimon-python/pypaimon/tests/data_types_test.py +++ b/paimon-python/pypaimon/tests/data_types_test.py @@ -17,8 +17,10 @@ """ import unittest from parameterized import parameterized +import pyarrow as pa -from pypaimon.schema.data_types import DataField, AtomicType, ArrayType, MultisetType, MapType, RowType +from pypaimon.schema.data_types import (DataField, AtomicType, ArrayType, MultisetType, MapType, + RowType, PyarrowFieldParser) class DataTypesTest(unittest.TestCase): @@ -65,3 +67,70 @@ def test_row_type(self): DataField(1, "b", AtomicType("TIMESTAMP(6)"),)]) self.assertEqual(str(row_data), str(RowType.from_dict(row_data.to_dict()))) + + def test_struct_from_paimon_to_pyarrow(self): + paimon_row = RowType( + nullable=True, + fields=[ + DataField(0, "field1", AtomicType("INT")), + DataField(1, "field2", AtomicType("STRING")), + DataField(2, "field3", AtomicType("DOUBLE")) + ] + ) + pa_struct = PyarrowFieldParser.from_paimon_type(paimon_row) + + self.assertTrue(pa.types.is_struct(pa_struct)) + self.assertEqual(len(pa_struct), 3) + self.assertEqual(pa_struct[0].name, "field1") + self.assertEqual(pa_struct[1].name, "field2") + self.assertEqual(pa_struct[2].name, "field3") + self.assertTrue(pa.types.is_int32(pa_struct[0].type)) + self.assertTrue(pa.types.is_string(pa_struct[1].type)) + self.assertTrue(pa.types.is_float64(pa_struct[2].type)) + + def test_struct_from_pyarrow_to_paimon(self): + pa_struct = pa.struct([ + pa.field("name", pa.string()), + pa.field("age", pa.int32()), + pa.field("score", pa.float64()) + ]) + paimon_row = PyarrowFieldParser.to_paimon_type(pa_struct, nullable=True) + + self.assertIsInstance(paimon_row, RowType) + self.assertTrue(paimon_row.nullable) + self.assertEqual(len(paimon_row.fields), 3) + self.assertEqual(paimon_row.fields[0].name, "name") + self.assertEqual(paimon_row.fields[1].name, "age") + self.assertEqual(paimon_row.fields[2].name, "score") + self.assertEqual(paimon_row.fields[0].type.type, "STRING") + self.assertEqual(paimon_row.fields[1].type.type, "INT") + self.assertEqual(paimon_row.fields[2].type.type, "DOUBLE") + + def test_nested_field_roundtrip(self): + nested_field = RowType( + nullable=True, + fields=[ + DataField(0, "inner_field1", AtomicType("STRING")), + DataField(1, "inner_field2", AtomicType("INT")) + ] + ) + paimon_row = RowType( + nullable=True, + fields=[ + DataField(0, "outer_field1", AtomicType("BIGINT")), + DataField(1, "nested", nested_field) + ] + ) + pa_struct = PyarrowFieldParser.from_paimon_type(paimon_row) + + converted_paimon_row = PyarrowFieldParser.to_paimon_type(pa_struct, nullable=True) + self.assertIsInstance(converted_paimon_row, RowType) + self.assertEqual(len(converted_paimon_row.fields), 2) + self.assertEqual(converted_paimon_row.fields[0].name, "outer_field1") + self.assertEqual(converted_paimon_row.fields[1].name, "nested") + + converted_nested_field = converted_paimon_row.fields[1].type + self.assertIsInstance(converted_nested_field, RowType) + self.assertEqual(len(converted_nested_field.fields), 2) + self.assertEqual(converted_nested_field.fields[0].name, "inner_field1") + self.assertEqual(converted_nested_field.fields[1].name, "inner_field2") diff --git a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py index 71e1fbb217ba..801c433e600a 100644 --- a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py +++ b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py @@ -127,7 +127,15 @@ def test_py_write_read_pk_table(self, file_format): ('category', pa.string()), ('value', pa.float64()), ('ts', pa.timestamp('us')), - ('ts_ltz', pa.timestamp('us', tz='UTC')) + ('ts_ltz', pa.timestamp('us', tz='UTC')), + ('metadata', pa.struct([ + pa.field('source', pa.string()), + pa.field('created_at', pa.int64()), + pa.field('location', pa.struct([ + pa.field('city', pa.string()), + pa.field('country', pa.string()) + ])) + ])) ]) table_name = f'default.mixed_test_pk_tablep_{file_format}' @@ -169,7 +177,15 @@ def test_py_write_read_pk_table(self, file_format): 'category': ['Fruit', 'Fruit', 'Vegetable', 'Vegetable', 'Meat', 'Meat'], 'value': [1.5, 0.8, 0.6, 1.2, 5.0, 8.0], 'ts': pd.to_datetime([1000000, 1000001, 1000002, 1000003, 1000004, 1000005], unit='ms'), - 'ts_ltz': pd.to_datetime([2000000, 2000001, 2000002, 2000003, 2000004, 2000005], unit='ms', utc=True) + 'ts_ltz': pd.to_datetime([2000000, 2000001, 2000002, 2000003, 2000004, 2000005], unit='ms', utc=True), + 'metadata': [ + {'source': 'store1', 'created_at': 1001, 'location': {'city': 'Beijing', 'country': 'China'}}, + {'source': 'store1', 'created_at': 1002, 'location': {'city': 'Shanghai', 'country': 'China'}}, + {'source': 'store2', 'created_at': 1003, 'location': {'city': 'Tokyo', 'country': 'Japan'}}, + {'source': 'store2', 'created_at': 1004, 'location': {'city': 'Seoul', 'country': 'Korea'}}, + {'source': 'store3', 'created_at': 1005, 'location': {'city': 'NewYork', 'country': 'USA'}}, + {'source': 'store3', 'created_at': 1006, 'location': {'city': 'London', 'country': 'UK'}} + ] }) write_builder = table.new_batch_write_builder() table_write = write_builder.new_write() @@ -210,11 +226,31 @@ def test_read_pk_table(self, file_format): if file_format != "lance": self.assertEqual(table.fields[4].type.type, "TIMESTAMP(6)") self.assertEqual(table.fields[5].type.type, "TIMESTAMP(6) WITH LOCAL TIME ZONE") + from pypaimon.schema.data_types import RowType + self.assertIsInstance(table.fields[6].type, RowType) + metadata_fields = table.fields[6].type.fields + self.assertEqual(len(metadata_fields), 3) + self.assertEqual(metadata_fields[0].name, 'source') + self.assertEqual(metadata_fields[1].name, 'created_at') + self.assertEqual(metadata_fields[2].name, 'location') + self.assertIsInstance(metadata_fields[2].type, RowType) + else: + from pypaimon.schema.data_types import RowType + self.assertIsInstance(table.fields[4].type, RowType) + metadata_fields = table.fields[4].type.fields + self.assertEqual(len(metadata_fields), 3) + self.assertIsInstance(metadata_fields[2].type, RowType) + # Data order may vary due to partitioning/bucketing, so compare as sets expected_names = {'Apple', 'Banana', 'Carrot', 'Broccoli', 'Chicken', 'Beef'} actual_names = set(res['name'].tolist()) self.assertEqual(actual_names, expected_names) + # Verify metadata column can be read and contains nested structures + if 'metadata' in res.columns: + self.assertFalse(res['metadata'].isnull().all()) + print(f"Format: {file_format}, Metadata column sample: {res['metadata'].iloc[0]}") + # For primary key tables, verify that _VALUE_KIND is written correctly # by checking if we can read the raw data with system fields # Note: Normal read filters out system fields, so we verify through Java read From 7f5306f14a34486f76d3d66787956fa17568df38 Mon Sep 17 00:00:00 2001 From: ChengHui Chen <27797326+chenghuichen@users.noreply.github.com> Date: Tue, 27 Jan 2026 11:38:56 +0800 Subject: [PATCH 2/5] support datatype of ROW/STRUCT --- .../java/org/apache/paimon/JavaPyE2ETest.java | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java index 4bf02ec931a1..aba75f488f30 100644 --- a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java +++ b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java @@ -103,6 +103,25 @@ public void before() throws Exception { } } + @Test + @EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true") + public void testReadAppendTable() throws Exception { + for (String format : Arrays.asList("parquet", "orc", "avro")) { + Identifier identifier = identifier("mixed_test_append_tablep_" + format); + Table table = catalog.getTable(identifier); + FileStoreTable fileStoreTable = (FileStoreTable) table; + List splits = + new ArrayList<>(fileStoreTable.newSnapshotReader().read().dataSplits()); + TableRead read = fileStoreTable.newRead(); + List res = + getResult( + read, + splits, + row -> DataFormatTestUtil.toStringNoRowKind(row, table.rowType())); + System.out.println(res); + } + } + @Test @EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true") public void testJavaWriteReadPkTable() throws Exception { From 98e1b2ed6f1196a969db2f13384ac82dcea56514 Mon Sep 17 00:00:00 2001 From: ChengHui Chen <27797326+chenghuichen@users.noreply.github.com> Date: Tue, 27 Jan 2026 15:04:48 +0800 Subject: [PATCH 3/5] support datatype of ROW/STRUCT --- .../java/org/apache/paimon/JavaPyE2ETest.java | 54 ++++++++++++------- .../tests/e2e/java_py_read_write_test.py | 7 --- 2 files changed, 34 insertions(+), 27 deletions(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java index aba75f488f30..99e14a010f38 100644 --- a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java +++ b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java @@ -220,18 +220,15 @@ public void testJavaWriteReadPkTable() throws Exception { new ArrayList<>(fileStoreTable.newSnapshotReader().read().dataSplits()); TableRead read = fileStoreTable.newRead(); List res = - getResult( - read, - splits, - row -> DataFormatTestUtil.toStringNoRowKind(row, table.rowType())); + getResult(read, splits, row -> rowToStringWithStruct(row, table.rowType())); assertThat(res) .containsExactlyInAnyOrder( - "1, Apple, Fruit, 1.5, 1970-01-01T00:16:40, 1970-01-01T00:33:20, +I[store1, 1001, +I[Beijing, China]]", - "2, Banana, Fruit, 0.8, 1970-01-01T00:16:40.001, 1970-01-01T00:33:20.001, +I[store1, 1002, +I[Shanghai, China]]", - "3, Carrot, Vegetable, 0.6, 1970-01-01T00:16:40.002, 1970-01-01T00:33:20.002, +I[store2, 1003, +I[Tokyo, Japan]]", - "4, Broccoli, Vegetable, 1.2, 1970-01-01T00:16:40.003, 1970-01-01T00:33:20.003, +I[store2, 1004, +I[Seoul, Korea]]", - "5, Chicken, Meat, 5.0, 1970-01-01T00:16:40.004, 1970-01-01T00:33:20.004, +I[store3, 1005, +I[NewYork, USA]]", - "6, Beef, Meat, 8.0, 1970-01-01T00:16:40.005, 1970-01-01T00:33:20.005, +I[store3, 1006, +I[London, UK]]"); + "+I[1, Apple, Fruit, 1.5, 1970-01-01T00:16:40, 1970-01-01T00:33:20, (store1, 1001, (Beijing, China))]", + "+I[2, Banana, Fruit, 0.8, 1970-01-01T00:16:40.001, 1970-01-01T00:33:20.001, (store1, 1002, (Shanghai, China))]", + "+I[3, Carrot, Vegetable, 0.6, 1970-01-01T00:16:40.002, 1970-01-01T00:33:20.002, (store2, 1003, (Tokyo, Japan))]", + "+I[4, Broccoli, Vegetable, 1.2, 1970-01-01T00:16:40.003, 1970-01-01T00:33:20.003, (store2, 1004, (Seoul, Korea))]", + "+I[5, Chicken, Meat, 5.0, 1970-01-01T00:16:40.004, 1970-01-01T00:33:20.004, (store3, 1005, (NewYork, USA))]", + "+I[6, Beef, Meat, 8.0, 1970-01-01T00:16:40.005, 1970-01-01T00:33:20.005, (store3, 1006, (London, UK))]"); } } @@ -380,10 +377,7 @@ public void testReadPkTable() throws Exception { new ArrayList<>(fileStoreTable.newSnapshotReader().read().dataSplits()); TableRead read = fileStoreTable.newRead(); List res = - getResult( - read, - splits, - row -> DataFormatTestUtil.toStringNoRowKind(row, table.rowType())); + getResult(read, splits, row -> rowToStringWithStruct(row, table.rowType())); System.out.println("Result for " + format + " : " + res); assertThat(table.rowType().getFieldTypes().get(4)).isEqualTo(DataTypes.TIMESTAMP()); assertThat(table.rowType().getFieldTypes().get(5)) @@ -393,12 +387,12 @@ public void testReadPkTable() throws Exception { assertThat(metadataType.getFieldTypes().get(2)).isInstanceOf(RowType.class); assertThat(res) .containsExactlyInAnyOrder( - "1, Apple, Fruit, 1.5, 1970-01-01T00:16:40, 1970-01-01T00:33:20, +I[store1, 1001, +I[Beijing, China]]", - "2, Banana, Fruit, 0.8, 1970-01-01T00:16:40.001, 1970-01-01T00:33:20.001, +I[store1, 1002, +I[Shanghai, China]]", - "3, Carrot, Vegetable, 0.6, 1970-01-01T00:16:40.002, 1970-01-01T00:33:20.002, +I[store2, 1003, +I[Tokyo, Japan]]", - "4, Broccoli, Vegetable, 1.2, 1970-01-01T00:16:40.003, 1970-01-01T00:33:20.003, +I[store2, 1004, +I[Seoul, Korea]]", - "5, Chicken, Meat, 5.0, 1970-01-01T00:16:40.004, 1970-01-01T00:33:20.004, +I[store3, 1005, +I[NewYork, USA]]", - "6, Beef, Meat, 8.0, 1970-01-01T00:16:40.005, 1970-01-01T00:33:20.005, +I[store3, 1006, +I[London, UK]]"); + "+I[1, Apple, Fruit, 1.5, 1970-01-01T00:16:40, 1970-01-01T00:33:20, (store1, 1001, (Beijing, China))]", + "+I[2, Banana, Fruit, 0.8, 1970-01-01T00:16:40.001, 1970-01-01T00:33:20.001, (store1, 1002, (Shanghai, China))]", + "+I[3, Carrot, Vegetable, 0.6, 1970-01-01T00:16:40.002, 1970-01-01T00:33:20.002, (store2, 1003, (Tokyo, Japan))]", + "+I[4, Broccoli, Vegetable, 1.2, 1970-01-01T00:16:40.003, 1970-01-01T00:33:20.003, (store2, 1004, (Seoul, Korea))]", + "+I[5, Chicken, Meat, 5.0, 1970-01-01T00:16:40.004, 1970-01-01T00:33:20.004, (store3, 1005, (NewYork, USA))]", + "+I[6, Beef, Meat, 8.0, 1970-01-01T00:16:40.005, 1970-01-01T00:33:20.005, (store3, 1006, (London, UK))]"); } } @@ -463,4 +457,24 @@ protected GenericRow createRow3Cols(Object... values) { protected GenericRow createRow3ColsWithKind(RowKind rowKind, Object... values) { return GenericRow.ofKind(rowKind, values[0], values[1], values[2]); } + + private static String rowToStringWithStruct(InternalRow row, RowType type) { + StringBuilder build = new StringBuilder(); + build.append(row.getRowKind().shortString()).append("["); + for (int i = 0; i < type.getFieldCount(); i++) { + if (i != 0) { + build.append(", "); + } + if (row.isNullAt(i)) { + build.append("NULL"); + } else { + InternalRow.FieldGetter fieldGetter = + InternalRow.createFieldGetter(type.getTypeAt(i), i); + Object field = fieldGetter.getFieldOrNull(row); + build.append(DataFormatTestUtil.getDataFieldString(field, type.getTypeAt(i))); + } + } + build.append("]"); + return build.toString(); + } } diff --git a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py index 801c433e600a..b56ced657c17 100644 --- a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py +++ b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py @@ -234,12 +234,6 @@ def test_read_pk_table(self, file_format): self.assertEqual(metadata_fields[1].name, 'created_at') self.assertEqual(metadata_fields[2].name, 'location') self.assertIsInstance(metadata_fields[2].type, RowType) - else: - from pypaimon.schema.data_types import RowType - self.assertIsInstance(table.fields[4].type, RowType) - metadata_fields = table.fields[4].type.fields - self.assertEqual(len(metadata_fields), 3) - self.assertIsInstance(metadata_fields[2].type, RowType) # Data order may vary due to partitioning/bucketing, so compare as sets expected_names = {'Apple', 'Banana', 'Carrot', 'Broccoli', 'Chicken', 'Beef'} @@ -249,7 +243,6 @@ def test_read_pk_table(self, file_format): # Verify metadata column can be read and contains nested structures if 'metadata' in res.columns: self.assertFalse(res['metadata'].isnull().all()) - print(f"Format: {file_format}, Metadata column sample: {res['metadata'].iloc[0]}") # For primary key tables, verify that _VALUE_KIND is written correctly # by checking if we can read the raw data with system fields From 407920f2304e725013190ee1b1ebe38086b9df76 Mon Sep 17 00:00:00 2001 From: ChengHui Chen <27797326+chenghuichen@users.noreply.github.com> Date: Tue, 27 Jan 2026 16:05:02 +0800 Subject: [PATCH 4/5] support datatype of ROW/STRUCT --- .github/workflows/e2e-tests-flink-1.x.yml | 1 + .github/workflows/e2e-tests-flink-2.x-jdk11.yml | 1 + .github/workflows/utitcase-flink-1.x-others.yml | 1 + .github/workflows/utitcase-spark-4.x.yml | 1 + .github/workflows/utitcase.yml | 1 + 5 files changed, 5 insertions(+) diff --git a/.github/workflows/e2e-tests-flink-1.x.yml b/.github/workflows/e2e-tests-flink-1.x.yml index 476b17ec3edd..47033b9f6e82 100644 --- a/.github/workflows/e2e-tests-flink-1.x.yml +++ b/.github/workflows/e2e-tests-flink-1.x.yml @@ -27,6 +27,7 @@ on: - 'paimon-python/**' - '.github/workflows/paimon-python-checks.yml' - 'paimon-lucene/**' + - 'paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java' env: JDK_VERSION: 8 diff --git a/.github/workflows/e2e-tests-flink-2.x-jdk11.yml b/.github/workflows/e2e-tests-flink-2.x-jdk11.yml index 941fcc0e86fa..49ccc7e99a12 100644 --- a/.github/workflows/e2e-tests-flink-2.x-jdk11.yml +++ b/.github/workflows/e2e-tests-flink-2.x-jdk11.yml @@ -27,6 +27,7 @@ on: - 'paimon-python/**' - '.github/workflows/paimon-python-checks.yml' - 'paimon-lucene/**' + - 'paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java' env: JDK_VERSION: 11 diff --git a/.github/workflows/utitcase-flink-1.x-others.yml b/.github/workflows/utitcase-flink-1.x-others.yml index 3c21037c68fe..0a1d9715c6d0 100644 --- a/.github/workflows/utitcase-flink-1.x-others.yml +++ b/.github/workflows/utitcase-flink-1.x-others.yml @@ -26,6 +26,7 @@ on: - 'paimon-python/**' - '.github/workflows/paimon-python-checks.yml' - 'paimon-lucene/**' + - 'paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java' env: JDK_VERSION: 8 diff --git a/.github/workflows/utitcase-spark-4.x.yml b/.github/workflows/utitcase-spark-4.x.yml index fc031342c48b..50ef7f8bad69 100644 --- a/.github/workflows/utitcase-spark-4.x.yml +++ b/.github/workflows/utitcase-spark-4.x.yml @@ -27,6 +27,7 @@ on: - 'paimon-python/**' - '.github/workflows/paimon-python-checks.yml' - 'paimon-lucene/**' + - 'paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java' env: JDK_VERSION: 17 diff --git a/.github/workflows/utitcase.yml b/.github/workflows/utitcase.yml index 52423a026e2f..915ec0385ac9 100644 --- a/.github/workflows/utitcase.yml +++ b/.github/workflows/utitcase.yml @@ -29,6 +29,7 @@ on: - 'paimon-lucene/**' - 'paimon-faiss/**' - '.github/workflows/faiss-vector-index-tests.yml' + - 'paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java' env: JDK_VERSION: 8 From f97e08017c9411bf287d6de71c145bd5f629bf96 Mon Sep 17 00:00:00 2001 From: ChengHui Chen <27797326+chenghuichen@users.noreply.github.com> Date: Tue, 27 Jan 2026 16:09:08 +0800 Subject: [PATCH 5/5] support datatype of ROW/STRUCT --- .github/workflows/utitcase-flink-1.x-common.yml | 1 + .github/workflows/utitcase-flink-2.x-jdk11.yml | 1 + .github/workflows/utitcase-jdk11.yml | 1 + .github/workflows/utitcase-spark-3.x.yml | 1 + 4 files changed, 4 insertions(+) diff --git a/.github/workflows/utitcase-flink-1.x-common.yml b/.github/workflows/utitcase-flink-1.x-common.yml index 03a6d1e815a3..b5f1bab891b2 100644 --- a/.github/workflows/utitcase-flink-1.x-common.yml +++ b/.github/workflows/utitcase-flink-1.x-common.yml @@ -26,6 +26,7 @@ on: - 'paimon-python/**' - '.github/workflows/paimon-python-checks.yml' - 'paimon-lucene/**' + - 'paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java' env: JDK_VERSION: 8 diff --git a/.github/workflows/utitcase-flink-2.x-jdk11.yml b/.github/workflows/utitcase-flink-2.x-jdk11.yml index 4be7d4ae62d6..129520a0b5aa 100644 --- a/.github/workflows/utitcase-flink-2.x-jdk11.yml +++ b/.github/workflows/utitcase-flink-2.x-jdk11.yml @@ -27,6 +27,7 @@ on: - 'paimon-python/**' - '.github/workflows/paimon-python-checks.yml' - 'paimon-lucene/**' + - 'paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java' env: JDK_VERSION: 11 diff --git a/.github/workflows/utitcase-jdk11.yml b/.github/workflows/utitcase-jdk11.yml index a5db2019a370..931f5e7b3b34 100644 --- a/.github/workflows/utitcase-jdk11.yml +++ b/.github/workflows/utitcase-jdk11.yml @@ -26,6 +26,7 @@ on: - '**/*.md' - 'paimon-python/**' - '.github/workflows/paimon-python-checks.yml' + - 'paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java' env: JDK_VERSION: 11 diff --git a/.github/workflows/utitcase-spark-3.x.yml b/.github/workflows/utitcase-spark-3.x.yml index 3fff587799a3..a08c5bad45b9 100644 --- a/.github/workflows/utitcase-spark-3.x.yml +++ b/.github/workflows/utitcase-spark-3.x.yml @@ -27,6 +27,7 @@ on: - 'paimon-python/**' - '.github/workflows/paimon-python-checks.yml' - 'paimon-lucene/**' + - 'paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java' env: JDK_VERSION: 8