Conversation
…ryException while executing a BatchStatement work in progress.
There was a problem hiding this comment.
Pull request overview
This PR implements automatic retry logic with smaller batch sizes when Cassandra rejects a batch as too large. When an InvalidQueryException is received while executing a BatchStatement, the code now catches this exception, wraps it in a BatchTooLargeException, and recursively splits the batch in half until it succeeds.
Key Changes:
- Introduced
BatchTooLargeExceptionto wrap batch size errors for both Cassandra 2 and 4 drivers - Modified
ExecutionUtilsto detectInvalidQueryExceptionon batch statements and convert toBatchTooLargeException - Updated
PersistentActorUpdateEventProcessorto recursively split batches in half when too large - Added comprehensive unit tests for the batch splitting logic
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 17 comments.
Show a summary per file
| File | Description |
|---|---|
main/backplane-cassandra4/src/main/java/org/elasticsoftware/elasticactors/cassandra4/state/BatchTooLargeException.java |
New exception class to encapsulate batch size errors with batch metadata |
main/backplane-cassandra2/src/main/java/org/elasticsoftware/elasticactors/cassandra2/state/BatchTooLargeException.java |
New exception class for Cassandra 2 driver with same functionality |
main/backplane-cassandra4/src/main/java/org/elasticsoftware/elasticactors/cassandra4/util/ExecutionUtils.java |
Added InvalidQueryException handling to detect and convert batch size errors |
main/backplane-cassandra2/src/main/java/org/elasticsoftware/elasticactors/cassandra2/util/ExecutionUtils.java |
Added InvalidQueryException handling for Cassandra 2 driver |
main/backplane-cassandra4/src/main/java/org/elasticsoftware/elasticactors/cassandra4/state/PersistentActorUpdateEventProcessor.java |
Refactored event processing and added recursive batch splitting on BatchTooLargeException |
main/backplane-cassandra2/src/main/java/org/elasticsoftware/elasticactors/cassandra2/state/PersistentActorUpdateEventProcessor.java |
Refactored event processing with recursive batch splitting for all batch execution methods |
main/backplane-cassandra4/src/test/java/org/elasticsoftware/elasticactors/cassandra4/state/PersistentActorUpdateEventProcessorTest.java |
Comprehensive test suite for batch splitting scenarios with 2, 3, and 9 events |
main/backplane-cassandra4/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker |
Enables Mockito inline mock maker for testing final classes |
main/backplane-cassandra4/pom.xml |
Added test dependencies for mockito-core, testng, and slf4j-simple |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| } catch(InvalidQueryException e) { | ||
| logger.error("InvalidQueryException with message {} on node {} while executing statement, will retry in case of BatchStatement", | ||
| e.getMessage(), | ||
| Optional.of(e.getEndPoint()).map(endPoint -> endPoint.resolve().toString()).orElse("UNKNOWN")); |
There was a problem hiding this comment.
Using Optional.of() instead of Optional.ofNullable() will throw a NullPointerException if e.getEndPoint() returns null. This should be Optional.ofNullable() to safely handle null values, consistent with other similar code patterns in this file.
| Optional.of(e.getEndPoint()).map(endPoint -> endPoint.resolve().toString()).orElse("UNKNOWN")); | |
| Optional.ofNullable(e.getEndPoint()).map(endPoint -> endPoint.resolve().toString()).orElse("UNKNOWN")); |
| } catch(BatchTooLargeException e) { | ||
| int half = events.size() / 2; | ||
| // batch is too large, so we need to split it up | ||
| logger.warn( | ||
| "Batch of byteSize {} is too large, splitting up in 2 batches. 1 of {} events and 1 of {} events", | ||
| e.getBatchSize(), | ||
| half, | ||
| events.size() - half); | ||
| processEvents(events.subList(0, half)); | ||
| processEvents(events.subList(half, events.size())); | ||
| } |
There was a problem hiding this comment.
The recursive splitting logic has no depth limit or minimum batch size check. If a single event in the batch causes the "batch too large" error (which shouldn't happen but could if the logic is triggered incorrectly), this will result in infinite recursion when the batch is split down to size 1 and processEvents with size 1 still somehow triggers the error. Consider adding a guard condition or maximum recursion depth.
There was a problem hiding this comment.
@copilot open a new pull request to apply changes based on this feedback
| processor.process(List.of(event, event)); | ||
|
|
||
| verify(cqlSession, times(1)).execute(any(BatchStatement.class)); | ||
| verify(insertStatement, times(4)).bind("key1", "key2", "actorId", ByteBuffer.wrap(new byte[1024])); |
There was a problem hiding this comment.
The test verifies that insertStatement.bind() is called 4 times for 2 events. However, this appears to be testing implementation details rather than behavior. The bind count includes both the failed batch attempt (2 binds) and the individual statement retries (2 more binds). This test may be brittle and could break if the implementation changes while still maintaining correct behavior.
| import com.datastax.oss.driver.api.core.cql.ResultSet; | ||
| import com.datastax.oss.driver.api.core.cql.Statement; | ||
| import com.datastax.oss.driver.api.core.servererrors.*; | ||
| import com.datastax.oss.protocol.internal.request.Batch; |
There was a problem hiding this comment.
The import com.datastax.oss.protocol.internal.request.Batch is unused and should be removed. This import does not appear to be referenced anywhere in the code.
| import com.datastax.oss.protocol.internal.request.Batch; |
...ssandra2/src/main/java/org/elasticsoftware/elasticactors/cassandra2/util/ExecutionUtils.java
Outdated
Show resolved
Hide resolved
| } catch(InvalidQueryException e) { | ||
| logger.error("InvalidQueryException with message {} on node {} while executing statement, will retry in case of BatchStatement", | ||
| e.getMessage(), | ||
| Optional.of(e.getCoordinator()).map(node -> node.getEndPoint().resolve().toString()).orElse("UNKNOWN")); | ||
| if(statement instanceof BatchStatement batch) { | ||
| throw new BatchTooLargeException(batch,batch.computeSizeInBytes(cassandraSession.getContext())); |
There was a problem hiding this comment.
The code assumes all InvalidQueryException instances when executing a BatchStatement indicate that the batch is too large. However, InvalidQueryException can be thrown for many other reasons (syntax errors, invalid table names, etc.). The code should check the exception message to confirm it's actually a "batch too large" error before converting it to BatchTooLargeException.
| private final int batchSize; | ||
|
|
||
| public BatchTooLargeException(BatchStatement originalBatch, int batchSize) { | ||
| super("BatchStatement of size "+batchSize+" too large to execute"); |
There was a problem hiding this comment.
Missing space in the error message. Should be: "BatchStatement of size " + batchSize + " too large to execute" (with a space before "too").
| } catch (BatchTooLargeException e) { | ||
| int half = events.size() / 2; | ||
| // batch is too large, so we need to split it up | ||
| logger.warn( | ||
| "Batch of byteSize {} is too large, splitting up in 2 batches. 1 of {} events and 1 of {} events", | ||
| e.getBatchSize(), | ||
| half, | ||
| events.size() - half); | ||
| processEvents(events.subList(0, half)); | ||
| processEvents(events.subList(half, events.size())); | ||
| } |
There was a problem hiding this comment.
The recursive splitting logic has no depth limit or minimum batch size check. If a single event in the batch causes the "batch too large" error (which shouldn't happen but could if the logic is triggered incorrectly), this will result in infinite recursion when the batch is split down to size 1 and processEvents with size 1 still somehow triggers the error. Consider adding a guard condition or maximum recursion depth.
| processor.process(List.of(event, event, event)); | ||
|
|
||
| verify(cqlSession, times(2)).execute(any(BatchStatement.class)); | ||
| verify(insertStatement, times(6)).bind("key1", "key2", "actorId", ByteBuffer.wrap(new byte[1024])); |
There was a problem hiding this comment.
The test verifies that insertStatement.bind() is called 6 times for 3 events. However, this appears to be testing implementation details rather than behavior. This test may be brittle and could break if the implementation changes while still maintaining correct behavior. Consider focusing on verifying that all events are successfully processed rather than counting internal method calls.
There was a problem hiding this comment.
@copilot open a new pull request to apply changes based on this feedback
| processor.process(List.of(event, event, event, event, event, event, event, event, event)); | ||
|
|
||
| verify(cqlSession, times(3)).execute(any(BatchStatement.class)); | ||
| verify(insertStatement, times(18)).bind("key1", "key2", "actorId", ByteBuffer.wrap(new byte[1024])); |
There was a problem hiding this comment.
The test verifies that insertStatement.bind() is called 18 times for 9 events. However, this appears to be testing implementation details rather than behavior. This test may be brittle and could break if the implementation changes while still maintaining correct behavior. Consider focusing on verifying that all events are successfully processed rather than counting internal method calls.
There was a problem hiding this comment.
@copilot open a new pull request to apply changes based on this feedback
…asticactors/cassandra2/util/ExecutionUtils.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
) * Initial plan * Refactor test to focus on behavior rather than implementation details Co-authored-by: jwijgerd <914840+jwijgerd@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: jwijgerd <914840+jwijgerd@users.noreply.github.com>
when receiving an InvalidQueryException while executing a BatchStatement
work in progress.