Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/e2e-tests-flink-1.x.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ on:
- 'paimon-python/**'
- '.github/workflows/paimon-python-checks.yml'
- 'paimon-lucene/**'
- 'paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java'

env:
JDK_VERSION: 8
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/e2e-tests-flink-2.x-jdk11.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ on:
- 'paimon-python/**'
- '.github/workflows/paimon-python-checks.yml'
- 'paimon-lucene/**'
- 'paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java'

env:
JDK_VERSION: 11
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/utitcase-flink-1.x-common.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ on:
- 'paimon-python/**'
- '.github/workflows/paimon-python-checks.yml'
- 'paimon-lucene/**'
- 'paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java'

env:
JDK_VERSION: 8
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/utitcase-flink-1.x-others.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ on:
- 'paimon-python/**'
- '.github/workflows/paimon-python-checks.yml'
- 'paimon-lucene/**'
- 'paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java'

env:
JDK_VERSION: 8
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/utitcase-flink-2.x-jdk11.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ on:
- 'paimon-python/**'
- '.github/workflows/paimon-python-checks.yml'
- 'paimon-lucene/**'
- 'paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java'

env:
JDK_VERSION: 11
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/utitcase-jdk11.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ on:
- '**/*.md'
- 'paimon-python/**'
- '.github/workflows/paimon-python-checks.yml'
- 'paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java'

env:
JDK_VERSION: 11
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/utitcase-spark-3.x.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ on:
- 'paimon-python/**'
- '.github/workflows/paimon-python-checks.yml'
- 'paimon-lucene/**'
- 'paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java'

env:
JDK_VERSION: 8
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/utitcase-spark-4.x.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ on:
- 'paimon-python/**'
- '.github/workflows/paimon-python-checks.yml'
- 'paimon-lucene/**'
- 'paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java'

env:
JDK_VERSION: 17
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/utitcase.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ on:
- 'paimon-lucene/**'
- 'paimon-faiss/**'
- '.github/workflows/faiss-vector-index-tests.yml'
- 'paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java'

env:
JDK_VERSION: 8
Expand Down
201 changes: 118 additions & 83 deletions paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,60 +103,6 @@ public void before() throws Exception {
}
}

@Test
@EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
public void testJavaWriteReadAppendTable() throws Exception {
for (String format : Arrays.asList("parquet", "orc", "avro")) {
Identifier identifier = identifier("mixed_test_append_tablej_" + format);
Schema schema =
Schema.newBuilder()
.column("id", DataTypes.INT())
.column("name", DataTypes.STRING())
.column("category", DataTypes.STRING())
.column("value", DataTypes.DOUBLE())
.column("ts", DataTypes.TIMESTAMP())
.column("ts_ltz", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())
.partitionKeys("category")
.option("dynamic-partition-overwrite", "false")
.option("file.format", format)
.build();

catalog.createTable(identifier, schema, true);
Table table = catalog.getTable(identifier);
FileStoreTable fileStoreTable = (FileStoreTable) table;

try (StreamTableWrite write = fileStoreTable.newWrite(commitUser);
InnerTableCommit commit = fileStoreTable.newCommit(commitUser)) {

write.write(createRow6Cols(1, "Apple", "Fruit", 1.5, 1000000L, 2000000L));
write.write(createRow6Cols(2, "Banana", "Fruit", 0.8, 1000001L, 2000001L));
write.write(createRow6Cols(3, "Carrot", "Vegetable", 0.6, 1000002L, 2000002L));
write.write(createRow6Cols(4, "Broccoli", "Vegetable", 1.2, 1000003L, 2000003L));
write.write(createRow6Cols(5, "Chicken", "Meat", 5.0, 1000004L, 2000004L));
write.write(createRow6Cols(6, "Beef", "Meat", 8.0, 1000005L, 2000005L));

commit.commit(0, write.prepareCommit(true, 0));
}

List<Split> splits =
new ArrayList<>(fileStoreTable.newSnapshotReader().read().dataSplits());
TableRead read = fileStoreTable.newRead();
List<String> res =
getResult(
read,
splits,
row -> DataFormatTestUtil.toStringNoRowKind(row, table.rowType()));
assertThat(res)
.containsExactlyInAnyOrder(
"1, Apple, Fruit, 1.5, 1970-01-01T00:16:40, 1970-01-01T00:33:20",
"2, Banana, Fruit, 0.8, 1970-01-01T00:16:40.001, 1970-01-01T00:33:20.001",
"3, Carrot, Vegetable, 0.6, 1970-01-01T00:16:40.002, 1970-01-01T00:33:20.002",
"4, Broccoli, Vegetable, 1.2, 1970-01-01T00:16:40.003, 1970-01-01T00:33:20.003",
"5, Chicken, Meat, 5.0, 1970-01-01T00:16:40.004, 1970-01-01T00:33:20.004",
"6, Beef, Meat, 8.0, 1970-01-01T00:16:40.005, 1970-01-01T00:33:20.005");
}
}

@Test
@EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
public void testReadAppendTable() throws Exception {
Expand Down Expand Up @@ -189,6 +135,21 @@ public void testJavaWriteReadPkTable() throws Exception {
.column("value", DataTypes.DOUBLE())
.column("ts", DataTypes.TIMESTAMP())
.column("ts_ltz", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())
.column(
"metadata",
DataTypes.ROW(
DataTypes.FIELD(0, "source", DataTypes.STRING()),
DataTypes.FIELD(1, "created_at", DataTypes.BIGINT()),
DataTypes.FIELD(
2,
"location",
DataTypes.ROW(
DataTypes.FIELD(
0, "city", DataTypes.STRING()),
DataTypes.FIELD(
1,
"country",
DataTypes.STRING())))))
.primaryKey("id")
.partitionKeys("category")
.option("dynamic-partition-overwrite", "false")
Expand All @@ -203,12 +164,54 @@ public void testJavaWriteReadPkTable() throws Exception {
try (StreamTableWrite write = fileStoreTable.newWrite(commitUser);
InnerTableCommit commit = fileStoreTable.newCommit(commitUser)) {

write.write(createRow6Cols(1, "Apple", "Fruit", 1.5, 1000000L, 2000000L));
write.write(createRow6Cols(2, "Banana", "Fruit", 0.8, 1000001L, 2000001L));
write.write(createRow6Cols(3, "Carrot", "Vegetable", 0.6, 1000002L, 2000002L));
write.write(createRow6Cols(4, "Broccoli", "Vegetable", 1.2, 1000003L, 2000003L));
write.write(createRow6Cols(5, "Chicken", "Meat", 5.0, 1000004L, 2000004L));
write.write(createRow6Cols(6, "Beef", "Meat", 8.0, 1000005L, 2000005L));
write.write(
createRow7Cols(
1, "Apple", "Fruit", 1.5, 1000000L, 2000000L, "store1", 1001L,
"Beijing", "China"));
write.write(
createRow7Cols(
2,
"Banana",
"Fruit",
0.8,
1000001L,
2000001L,
"store1",
1002L,
"Shanghai",
"China"));
write.write(
createRow7Cols(
3,
"Carrot",
"Vegetable",
0.6,
1000002L,
2000002L,
"store2",
1003L,
"Tokyo",
"Japan"));
write.write(
createRow7Cols(
4,
"Broccoli",
"Vegetable",
1.2,
1000003L,
2000003L,
"store2",
1004L,
"Seoul",
"Korea"));
write.write(
createRow7Cols(
5, "Chicken", "Meat", 5.0, 1000004L, 2000004L, "store3", 1005L,
"NewYork", "USA"));
write.write(
createRow7Cols(
6, "Beef", "Meat", 8.0, 1000005L, 2000005L, "store3", 1006L,
"London", "UK"));

commit.commit(0, write.prepareCommit(true, 0));
}
Expand All @@ -217,18 +220,15 @@ public void testJavaWriteReadPkTable() throws Exception {
new ArrayList<>(fileStoreTable.newSnapshotReader().read().dataSplits());
TableRead read = fileStoreTable.newRead();
List<String> res =
getResult(
read,
splits,
row -> DataFormatTestUtil.toStringNoRowKind(row, table.rowType()));
getResult(read, splits, row -> rowToStringWithStruct(row, table.rowType()));
assertThat(res)
.containsExactlyInAnyOrder(
"1, Apple, Fruit, 1.5, 1970-01-01T00:16:40, 1970-01-01T00:33:20",
"2, Banana, Fruit, 0.8, 1970-01-01T00:16:40.001, 1970-01-01T00:33:20.001",
"3, Carrot, Vegetable, 0.6, 1970-01-01T00:16:40.002, 1970-01-01T00:33:20.002",
"4, Broccoli, Vegetable, 1.2, 1970-01-01T00:16:40.003, 1970-01-01T00:33:20.003",
"5, Chicken, Meat, 5.0, 1970-01-01T00:16:40.004, 1970-01-01T00:33:20.004",
"6, Beef, Meat, 8.0, 1970-01-01T00:16:40.005, 1970-01-01T00:33:20.005");
"+I[1, Apple, Fruit, 1.5, 1970-01-01T00:16:40, 1970-01-01T00:33:20, (store1, 1001, (Beijing, China))]",
"+I[2, Banana, Fruit, 0.8, 1970-01-01T00:16:40.001, 1970-01-01T00:33:20.001, (store1, 1002, (Shanghai, China))]",
"+I[3, Carrot, Vegetable, 0.6, 1970-01-01T00:16:40.002, 1970-01-01T00:33:20.002, (store2, 1003, (Tokyo, Japan))]",
"+I[4, Broccoli, Vegetable, 1.2, 1970-01-01T00:16:40.003, 1970-01-01T00:33:20.003, (store2, 1004, (Seoul, Korea))]",
"+I[5, Chicken, Meat, 5.0, 1970-01-01T00:16:40.004, 1970-01-01T00:33:20.004, (store3, 1005, (NewYork, USA))]",
"+I[6, Beef, Meat, 8.0, 1970-01-01T00:16:40.005, 1970-01-01T00:33:20.005, (store3, 1006, (London, UK))]");
}
}

Expand Down Expand Up @@ -377,22 +377,22 @@ public void testReadPkTable() throws Exception {
new ArrayList<>(fileStoreTable.newSnapshotReader().read().dataSplits());
TableRead read = fileStoreTable.newRead();
List<String> res =
getResult(
read,
splits,
row -> DataFormatTestUtil.toStringNoRowKind(row, table.rowType()));
getResult(read, splits, row -> rowToStringWithStruct(row, table.rowType()));
System.out.println("Result for " + format + " : " + res);
assertThat(table.rowType().getFieldTypes().get(4)).isEqualTo(DataTypes.TIMESTAMP());
assertThat(table.rowType().getFieldTypes().get(5))
.isEqualTo(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE());
assertThat(table.rowType().getFieldTypes().get(6)).isInstanceOf(RowType.class);
RowType metadataType = (RowType) table.rowType().getFieldTypes().get(6);
assertThat(metadataType.getFieldTypes().get(2)).isInstanceOf(RowType.class);
assertThat(res)
.containsExactlyInAnyOrder(
"1, Apple, Fruit, 1.5, 1970-01-01T00:16:40, 1970-01-01T00:33:20",
"2, Banana, Fruit, 0.8, 1970-01-01T00:16:40.001, 1970-01-01T00:33:20.001",
"3, Carrot, Vegetable, 0.6, 1970-01-01T00:16:40.002, 1970-01-01T00:33:20.002",
"4, Broccoli, Vegetable, 1.2, 1970-01-01T00:16:40.003, 1970-01-01T00:33:20.003",
"5, Chicken, Meat, 5.0, 1970-01-01T00:16:40.004, 1970-01-01T00:33:20.004",
"6, Beef, Meat, 8.0, 1970-01-01T00:16:40.005, 1970-01-01T00:33:20.005");
"+I[1, Apple, Fruit, 1.5, 1970-01-01T00:16:40, 1970-01-01T00:33:20, (store1, 1001, (Beijing, China))]",
"+I[2, Banana, Fruit, 0.8, 1970-01-01T00:16:40.001, 1970-01-01T00:33:20.001, (store1, 1002, (Shanghai, China))]",
"+I[3, Carrot, Vegetable, 0.6, 1970-01-01T00:16:40.002, 1970-01-01T00:33:20.002, (store2, 1003, (Tokyo, Japan))]",
"+I[4, Broccoli, Vegetable, 1.2, 1970-01-01T00:16:40.003, 1970-01-01T00:33:20.003, (store2, 1004, (Seoul, Korea))]",
"+I[5, Chicken, Meat, 5.0, 1970-01-01T00:16:40.004, 1970-01-01T00:33:20.004, (store3, 1005, (NewYork, USA))]",
"+I[6, Beef, Meat, 8.0, 1970-01-01T00:16:40.005, 1970-01-01T00:33:20.005, (store3, 1006, (London, UK))]");
}
}

Expand Down Expand Up @@ -424,15 +424,30 @@ protected FileStoreTable createFileStoreTable(Consumer<Options> configure, Path
FileIOFinder.find(tablePath), tablePath, tableSchema, CatalogEnvironment.empty());
}

private static InternalRow createRow6Cols(
int id, String name, String category, double value, long ts, long tsLtz) {
private static InternalRow createRow7Cols(
int id,
String name,
String category,
double value,
long ts,
long tsLtz,
String metadataSource,
long metadataCreatedAt,
String city,
String country) {
GenericRow locationRow =
GenericRow.of(BinaryString.fromString(city), BinaryString.fromString(country));
GenericRow metadataRow =
GenericRow.of(
BinaryString.fromString(metadataSource), metadataCreatedAt, locationRow);
return GenericRow.of(
id,
BinaryString.fromString(name),
BinaryString.fromString(category),
value,
org.apache.paimon.data.Timestamp.fromEpochMillis(ts),
org.apache.paimon.data.Timestamp.fromEpochMillis(tsLtz));
org.apache.paimon.data.Timestamp.fromEpochMillis(tsLtz),
metadataRow);
}

protected GenericRow createRow3Cols(Object... values) {
Expand All @@ -442,4 +457,24 @@ protected GenericRow createRow3Cols(Object... values) {
protected GenericRow createRow3ColsWithKind(RowKind rowKind, Object... values) {
return GenericRow.ofKind(rowKind, values[0], values[1], values[2]);
}

private static String rowToStringWithStruct(InternalRow row, RowType type) {
StringBuilder build = new StringBuilder();
build.append(row.getRowKind().shortString()).append("[");
for (int i = 0; i < type.getFieldCount(); i++) {
if (i != 0) {
build.append(", ");
}
if (row.isNullAt(i)) {
build.append("NULL");
} else {
InternalRow.FieldGetter fieldGetter =
InternalRow.createFieldGetter(type.getTypeAt(i), i);
Object field = fieldGetter.getFieldOrNull(row);
build.append(DataFormatTestUtil.getDataFieldString(field, type.getTypeAt(i)));
}
}
build.append("]");
return build.toString();
}
}
17 changes: 17 additions & 0 deletions paimon-python/pypaimon/schema/data_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,12 @@ def from_paimon_type(data_type: DataType) -> pyarrow.DataType:
key_type = PyarrowFieldParser.from_paimon_type(data_type.key)
value_type = PyarrowFieldParser.from_paimon_type(data_type.value)
return pyarrow.map_(key_type, value_type)
elif isinstance(data_type, RowType):
pa_fields = []
for field in data_type.fields:
pa_field_type = PyarrowFieldParser.from_paimon_type(field.type)
pa_fields.append(pyarrow.field(field.name, pa_field_type, nullable=field.type.nullable))
return pyarrow.struct(pa_fields)
raise ValueError("Unsupported data type: {}".format(data_type))

@staticmethod
Expand Down Expand Up @@ -539,6 +545,17 @@ def to_paimon_type(pa_type: pyarrow.DataType, nullable: bool) -> DataType:
key_type = PyarrowFieldParser.to_paimon_type(pa_type.key_type, nullable)
value_type = PyarrowFieldParser.to_paimon_type(pa_type.item_type, nullable)
return MapType(nullable, key_type, value_type)
elif types.is_struct(pa_type):
pa_type: pyarrow.StructType
fields = []
for i, pa_field in enumerate(pa_type):
field_type = PyarrowFieldParser.to_paimon_type(pa_field.type, pa_field.nullable)
fields.append(DataField(
id=i,
name=pa_field.name,
type=field_type
))
return RowType(nullable, fields)
if type_name is not None:
return AtomicType(type_name, nullable)
raise ValueError("Unsupported pyarrow type: {}".format(pa_type))
Expand Down
Loading