Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
## Unreleased

* Adding rewind client API ([#253](https://github.com/microsoft/durabletask-java/pull/253))

## v1.6.2
* Fixing gRPC channel shutdown ([#249](https://github.com/microsoft/durabletask-java/pull/249))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ public class HttpManagementPayload {
private final String id;
private final String purgeHistoryDeleteUri;
private final String restartPostUri;
private final String rewindPostUri;
private final String sendEventPostUri;
private final String statusQueryGetUri;
private final String terminatePostUri;
Expand All @@ -33,6 +34,7 @@ public HttpManagementPayload(
this.id = instanceId;
this.purgeHistoryDeleteUri = instanceStatusURL + "?" + requiredQueryStringParameters;
this.restartPostUri = instanceStatusURL + "/restart?" + requiredQueryStringParameters;
this.rewindPostUri = instanceStatusURL + "/rewind?reason={text}&" + requiredQueryStringParameters;
this.sendEventPostUri = instanceStatusURL + "/raiseEvent/{eventName}?" + requiredQueryStringParameters;
this.statusQueryGetUri = instanceStatusURL + "?" + requiredQueryStringParameters;
this.terminatePostUri = instanceStatusURL + "/terminate?reason={text}&" + requiredQueryStringParameters;
Expand Down Expand Up @@ -94,4 +96,13 @@ public String getRestartPostUri() {
return restartPostUri;
}

/**
* Gets the HTTP POST instance rewind endpoint.
*
* @return The HTTP URL for posting instance rewind commands.
*/
public String getRewindPostUri() {
return rewindPostUri;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,31 @@ public abstract OrchestrationMetadata waitForInstanceCompletion(
*/
public abstract String restartInstance(String instanceId, boolean restartWithNewInstanceId);

/**
* Rewinds a failed orchestration instance to the last known good state and replays from there.
* <p>
* This method can only be used on orchestration instances that are in a <code>Failed</code> state.
* When rewound, the orchestration instance will restart from the point of failure as if the failure
* never occurred.
*
* @param instanceId the ID of the orchestration instance to rewind
*/
public void rewindInstance(String instanceId) {
this.rewindInstance(instanceId, null);
}

/**
* Rewinds a failed orchestration instance to the last known good state and replays from there.
* <p>
* This method can only be used on orchestration instances that are in a <code>Failed</code> state.
* When rewound, the orchestration instance will restart from the point of failure as if the failure
* never occurred.
*
* @param instanceId the ID of the orchestration instance to rewind
* @param reason the reason for rewinding the orchestration instance
*/
public abstract void rewindInstance(String instanceId, @Nullable String reason);

/**
* Suspends a running orchestration instance.
* @param instanceId the ID of the orchestration instance to suspend
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,16 @@ public PurgeResult purgeInstances(PurgeInstanceCriteria purgeInstanceCriteria) t
}
}

@Override
public void rewindInstance(String instanceId, @Nullable String reason) {
Copy link

Copilot AI Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing null check for instanceId parameter. Similar methods in this class like terminate() at line 249 use Helpers.throwIfArgumentNull(instanceId, "instanceId") to validate the parameter. This validation should be added for consistency and to provide a clear error message when null is passed.

Suggested change
public void rewindInstance(String instanceId, @Nullable String reason) {
public void rewindInstance(String instanceId, @Nullable String reason) {
Helpers.throwIfArgumentNull(instanceId, "instanceId");

Copilot uses AI. Check for mistakes.
RewindInstanceRequest.Builder rewindRequestBuilder = RewindInstanceRequest.newBuilder();
rewindRequestBuilder.setInstanceId(instanceId);
if (reason != null) {
rewindRequestBuilder.setReason(StringValue.of(reason));
}
this.sidecarClient.rewindInstance(rewindRequestBuilder.build());
}

@Override
public void suspendInstance(String instanceId, @Nullable String reason) {
SuspendRequest.Builder suspendRequestBuilder = SuspendRequest.newBuilder();
Expand Down
89 changes: 89 additions & 0 deletions endtoendtests/src/main/java/com/functions/RewindTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package com.functions;

import com.microsoft.azure.functions.ExecutionContext;
import com.microsoft.azure.functions.HttpMethod;
import com.microsoft.azure.functions.HttpRequestMessage;
import com.microsoft.azure.functions.HttpResponseMessage;
import com.microsoft.azure.functions.annotation.AuthorizationLevel;
import com.microsoft.azure.functions.annotation.FunctionName;
import com.microsoft.azure.functions.annotation.HttpTrigger;
import com.microsoft.durabletask.DurableTaskClient;
import com.microsoft.durabletask.TaskOrchestrationContext;
import com.microsoft.durabletask.azurefunctions.DurableActivityTrigger;
import com.microsoft.durabletask.azurefunctions.DurableClientContext;
import com.microsoft.durabletask.azurefunctions.DurableClientInput;
import com.microsoft.durabletask.azurefunctions.DurableOrchestrationTrigger;

import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Sample functions to test the rewind functionality.
* Rewind allows a failed orchestration to be replayed from its last known good state.
*/
public class RewindTest {

// Flag to control whether the activity should fail (first call fails, subsequent calls succeed)
private static final AtomicBoolean shouldFail = new AtomicBoolean(true);

/**
* HTTP trigger to start the rewindable orchestration.
*/
@FunctionName("StartRewindableOrchestration")
public HttpResponseMessage startRewindableOrchestration(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext,
final ExecutionContext context) {
context.getLogger().info("Starting rewindable orchestration.");

// Reset the failure flag so the first activity call will fail
shouldFail.set(true);

DurableTaskClient client = durableContext.getClient();
String instanceId = client.scheduleNewOrchestrationInstance("RewindableOrchestration");
context.getLogger().info("Created new Java orchestration with instance ID = " + instanceId);
return durableContext.createCheckStatusResponse(request, instanceId);
}

/**
* Orchestration that calls an activity which will fail on the first attempt.
* After rewinding, the orchestration will replay and the activity will succeed.
*/
@FunctionName("RewindableOrchestration")
public String rewindableOrchestration(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
// Call the activity that may fail
String result = ctx.callActivity("FailOnceActivity", "RewindTest", String.class).await();
return result;
}

/**
* Activity that fails on the first call but succeeds on subsequent calls.
* This simulates a transient failure that can be recovered by rewinding.
*/
@FunctionName("FailOnceActivity")
public String failOnceActivity(
@DurableActivityTrigger(name = "input") String input,
final ExecutionContext context) {
if (shouldFail.compareAndSet(true, false)) {
context.getLogger().warning("FailOnceActivity: Simulating failure for input: " + input);
throw new RuntimeException("Simulated transient failure - rewind to retry");
}
context.getLogger().info("FailOnceActivity: Success for input: " + input);
return input + "-rewound-success";
}

/**
* HTTP trigger to reset the failure flag (useful for testing).
*/
@FunctionName("ResetRewindFailureFlag")
public HttpResponseMessage resetRewindFailureFlag(
@HttpTrigger(name = "req", methods = {HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
final ExecutionContext context) {
shouldFail.set(true);
context.getLogger().info("Reset failure flag to true.");
return request.createResponseBuilder(com.microsoft.azure.functions.HttpStatus.OK)
.body("Failure flag reset to true")
.build();
}
}
43 changes: 43 additions & 0 deletions endtoendtests/src/test/java/com/functions/EndToEndTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.restassured.http.ContentType;
import io.restassured.path.json.JsonPath;
import io.restassured.response.Response;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
Expand All @@ -26,6 +27,14 @@
@Tag("e2e")
public class EndToEndTests {

@BeforeAll
public static void setup() {
RestAssured.baseURI = "http://localhost";
// Use port 8080 for Docker, 7071 for local func start
String port = System.getenv("FUNCTIONS_PORT");
RestAssured.port = port != null ? Integer.parseInt(port) : 8080;
Copy link

Copilot AI Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential uncaught 'java.lang.NumberFormatException'.

Suggested change
RestAssured.port = port != null ? Integer.parseInt(port) : 8080;
int resolvedPort = 8080;
if (port != null && !port.isEmpty()) {
try {
resolvedPort = Integer.parseInt(port);
} catch (NumberFormatException e) {
System.err.println("Invalid FUNCTIONS_PORT value '" + port + "'. Falling back to default port " + resolvedPort + ".");
}
}
RestAssured.port = resolvedPort;

Copilot uses AI. Check for mistakes.
}

@Order(1)
@Test
public void setupHost() {
Expand Down Expand Up @@ -216,6 +225,40 @@ public void suspendResume() throws InterruptedException {
assertTrue(completed);
}

@Test
public void rewindFailedOrchestration() throws InterruptedException {
// Reset the failure flag before starting
post("/api/ResetRewindFailureFlag");

// Start the orchestration - it will fail on the first activity call
String startOrchestrationPath = "/api/StartRewindableOrchestration";
Response response = post(startOrchestrationPath);
JsonPath jsonPath = response.jsonPath();
String statusQueryGetUri = jsonPath.get("statusQueryGetUri");

// Wait for the orchestration to fail
boolean failed = pollingCheck(statusQueryGetUri, "Failed", null, Duration.ofSeconds(10));
assertTrue(failed, "Orchestration should have failed");

// Get the rewind URI and rewind the orchestration
String rewindPostUri = jsonPath.get("rewindPostUri");
rewindPostUri = rewindPostUri.replace("{text}", "Testing rewind functionality");
Response rewindResponse = post(rewindPostUri);
assertEquals(202, rewindResponse.getStatusCode(), "Rewind should return 202 Accepted");

// Wait for the orchestration to complete after rewind
Set<String> continueStates = new HashSet<>();
continueStates.add("Pending");
continueStates.add("Running");
boolean completed = pollingCheck(statusQueryGetUri, "Completed", continueStates, Duration.ofSeconds(15));
assertTrue(completed, "Orchestration should complete after rewind");

// Verify the output contains the expected result
Response statusResponse = get(statusQueryGetUri);
String output = statusResponse.jsonPath().get("output");
assertTrue(output.contains("rewound-success"), "Output should indicate successful rewind: " + output);
}

@Test
public void externalEventDeserializeFail() throws InterruptedException {
String startOrchestrationPath = "api/ExternalEventHttp";
Expand Down
2 changes: 1 addition & 1 deletion internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH
Original file line number Diff line number Diff line change
@@ -1 +1 @@
fbe5bb20835678099fc51a44993ed9b045dee5a6
026329c53fe6363985655857b9ca848ec7238bd2
Loading
Loading