diff --git a/CHANGELOG.md b/CHANGELOG.md
index 87977e6f..e14c2bcf 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,7 @@
## Unreleased
+* Adding rewind client API ([#253](https://github.com/microsoft/durabletask-java/pull/253))
+
## v1.6.2
* Fixing gRPC channel shutdown ([#249](https://github.com/microsoft/durabletask-java/pull/249))
diff --git a/azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/HttpManagementPayload.java b/azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/HttpManagementPayload.java
index f78c00f9..dace60b7 100644
--- a/azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/HttpManagementPayload.java
+++ b/azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/HttpManagementPayload.java
@@ -13,6 +13,7 @@ public class HttpManagementPayload {
private final String id;
private final String purgeHistoryDeleteUri;
private final String restartPostUri;
+ private final String rewindPostUri;
private final String sendEventPostUri;
private final String statusQueryGetUri;
private final String terminatePostUri;
@@ -33,6 +34,7 @@ public HttpManagementPayload(
this.id = instanceId;
this.purgeHistoryDeleteUri = instanceStatusURL + "?" + requiredQueryStringParameters;
this.restartPostUri = instanceStatusURL + "/restart?" + requiredQueryStringParameters;
+ this.rewindPostUri = instanceStatusURL + "/rewind?reason={text}&" + requiredQueryStringParameters;
this.sendEventPostUri = instanceStatusURL + "/raiseEvent/{eventName}?" + requiredQueryStringParameters;
this.statusQueryGetUri = instanceStatusURL + "?" + requiredQueryStringParameters;
this.terminatePostUri = instanceStatusURL + "/terminate?reason={text}&" + requiredQueryStringParameters;
@@ -94,4 +96,13 @@ public String getRestartPostUri() {
return restartPostUri;
}
+ /**
+ * Gets the HTTP POST instance rewind endpoint.
+ *
+ * @return The HTTP URL for posting instance rewind commands.
+ */
+ public String getRewindPostUri() {
+ return rewindPostUri;
+ }
+
}
diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java
index 4590277f..3b2ca4be 100644
--- a/client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java
+++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java
@@ -292,6 +292,31 @@ public abstract OrchestrationMetadata waitForInstanceCompletion(
*/
public abstract String restartInstance(String instanceId, boolean restartWithNewInstanceId);
+ /**
+ * Rewinds a failed orchestration instance to the last known good state and replays from there.
+ *
+ * This method can only be used on orchestration instances that are in a Failed state.
+ * When rewound, the orchestration instance will restart from the point of failure as if the failure
+ * never occurred.
+ *
+ * @param instanceId the ID of the orchestration instance to rewind
+ */
+ public void rewindInstance(String instanceId) {
+ this.rewindInstance(instanceId, null);
+ }
+
+ /**
+ * Rewinds a failed orchestration instance to the last known good state and replays from there.
+ *
+ * This method can only be used on orchestration instances that are in a Failed state.
+ * When rewound, the orchestration instance will restart from the point of failure as if the failure
+ * never occurred.
+ *
+ * @param instanceId the ID of the orchestration instance to rewind
+ * @param reason the reason for rewinding the orchestration instance
+ */
+ public abstract void rewindInstance(String instanceId, @Nullable String reason);
+
/**
* Suspends a running orchestration instance.
* @param instanceId the ID of the orchestration instance to suspend
diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java
index 52d072b8..c9d23c55 100644
--- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java
+++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java
@@ -331,6 +331,16 @@ public PurgeResult purgeInstances(PurgeInstanceCriteria purgeInstanceCriteria) t
}
}
+ @Override
+ public void rewindInstance(String instanceId, @Nullable String reason) {
+ RewindInstanceRequest.Builder rewindRequestBuilder = RewindInstanceRequest.newBuilder();
+ rewindRequestBuilder.setInstanceId(instanceId);
+ if (reason != null) {
+ rewindRequestBuilder.setReason(StringValue.of(reason));
+ }
+ this.sidecarClient.rewindInstance(rewindRequestBuilder.build());
+ }
+
@Override
public void suspendInstance(String instanceId, @Nullable String reason) {
SuspendRequest.Builder suspendRequestBuilder = SuspendRequest.newBuilder();
diff --git a/endtoendtests/src/main/java/com/functions/RewindTest.java b/endtoendtests/src/main/java/com/functions/RewindTest.java
new file mode 100644
index 00000000..71844027
--- /dev/null
+++ b/endtoendtests/src/main/java/com/functions/RewindTest.java
@@ -0,0 +1,89 @@
+package com.functions;
+
+import com.microsoft.azure.functions.ExecutionContext;
+import com.microsoft.azure.functions.HttpMethod;
+import com.microsoft.azure.functions.HttpRequestMessage;
+import com.microsoft.azure.functions.HttpResponseMessage;
+import com.microsoft.azure.functions.annotation.AuthorizationLevel;
+import com.microsoft.azure.functions.annotation.FunctionName;
+import com.microsoft.azure.functions.annotation.HttpTrigger;
+import com.microsoft.durabletask.DurableTaskClient;
+import com.microsoft.durabletask.TaskOrchestrationContext;
+import com.microsoft.durabletask.azurefunctions.DurableActivityTrigger;
+import com.microsoft.durabletask.azurefunctions.DurableClientContext;
+import com.microsoft.durabletask.azurefunctions.DurableClientInput;
+import com.microsoft.durabletask.azurefunctions.DurableOrchestrationTrigger;
+
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Sample functions to test the rewind functionality.
+ * Rewind allows a failed orchestration to be replayed from its last known good state.
+ */
+public class RewindTest {
+
+ // Flag to control whether the activity should fail (first call fails, subsequent calls succeed)
+ private static final AtomicBoolean shouldFail = new AtomicBoolean(true);
+
+ /**
+ * HTTP trigger to start the rewindable orchestration.
+ */
+ @FunctionName("StartRewindableOrchestration")
+ public HttpResponseMessage startRewindableOrchestration(
+ @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage> request,
+ @DurableClientInput(name = "durableContext") DurableClientContext durableContext,
+ final ExecutionContext context) {
+ context.getLogger().info("Starting rewindable orchestration.");
+
+ // Reset the failure flag so the first activity call will fail
+ shouldFail.set(true);
+
+ DurableTaskClient client = durableContext.getClient();
+ String instanceId = client.scheduleNewOrchestrationInstance("RewindableOrchestration");
+ context.getLogger().info("Created new Java orchestration with instance ID = " + instanceId);
+ return durableContext.createCheckStatusResponse(request, instanceId);
+ }
+
+ /**
+ * Orchestration that calls an activity which will fail on the first attempt.
+ * After rewinding, the orchestration will replay and the activity will succeed.
+ */
+ @FunctionName("RewindableOrchestration")
+ public String rewindableOrchestration(
+ @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
+ // Call the activity that may fail
+ String result = ctx.callActivity("FailOnceActivity", "RewindTest", String.class).await();
+ return result;
+ }
+
+ /**
+ * Activity that fails on the first call but succeeds on subsequent calls.
+ * This simulates a transient failure that can be recovered by rewinding.
+ */
+ @FunctionName("FailOnceActivity")
+ public String failOnceActivity(
+ @DurableActivityTrigger(name = "input") String input,
+ final ExecutionContext context) {
+ if (shouldFail.compareAndSet(true, false)) {
+ context.getLogger().warning("FailOnceActivity: Simulating failure for input: " + input);
+ throw new RuntimeException("Simulated transient failure - rewind to retry");
+ }
+ context.getLogger().info("FailOnceActivity: Success for input: " + input);
+ return input + "-rewound-success";
+ }
+
+ /**
+ * HTTP trigger to reset the failure flag (useful for testing).
+ */
+ @FunctionName("ResetRewindFailureFlag")
+ public HttpResponseMessage resetRewindFailureFlag(
+ @HttpTrigger(name = "req", methods = {HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage> request,
+ final ExecutionContext context) {
+ shouldFail.set(true);
+ context.getLogger().info("Reset failure flag to true.");
+ return request.createResponseBuilder(com.microsoft.azure.functions.HttpStatus.OK)
+ .body("Failure flag reset to true")
+ .build();
+ }
+}
diff --git a/endtoendtests/src/test/java/com/functions/EndToEndTests.java b/endtoendtests/src/test/java/com/functions/EndToEndTests.java
index c2d0be02..de824e36 100644
--- a/endtoendtests/src/test/java/com/functions/EndToEndTests.java
+++ b/endtoendtests/src/test/java/com/functions/EndToEndTests.java
@@ -4,6 +4,7 @@
import io.restassured.http.ContentType;
import io.restassured.path.json.JsonPath;
import io.restassured.response.Response;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
@@ -26,6 +27,14 @@
@Tag("e2e")
public class EndToEndTests {
+ @BeforeAll
+ public static void setup() {
+ RestAssured.baseURI = "http://localhost";
+ // Use port 8080 for Docker, 7071 for local func start
+ String port = System.getenv("FUNCTIONS_PORT");
+ RestAssured.port = port != null ? Integer.parseInt(port) : 8080;
+ }
+
@Order(1)
@Test
public void setupHost() {
@@ -216,6 +225,40 @@ public void suspendResume() throws InterruptedException {
assertTrue(completed);
}
+ @Test
+ public void rewindFailedOrchestration() throws InterruptedException {
+ // Reset the failure flag before starting
+ post("/api/ResetRewindFailureFlag");
+
+ // Start the orchestration - it will fail on the first activity call
+ String startOrchestrationPath = "/api/StartRewindableOrchestration";
+ Response response = post(startOrchestrationPath);
+ JsonPath jsonPath = response.jsonPath();
+ String statusQueryGetUri = jsonPath.get("statusQueryGetUri");
+
+ // Wait for the orchestration to fail
+ boolean failed = pollingCheck(statusQueryGetUri, "Failed", null, Duration.ofSeconds(10));
+ assertTrue(failed, "Orchestration should have failed");
+
+ // Get the rewind URI and rewind the orchestration
+ String rewindPostUri = jsonPath.get("rewindPostUri");
+ rewindPostUri = rewindPostUri.replace("{text}", "Testing rewind functionality");
+ Response rewindResponse = post(rewindPostUri);
+ assertEquals(202, rewindResponse.getStatusCode(), "Rewind should return 202 Accepted");
+
+ // Wait for the orchestration to complete after rewind
+ Set continueStates = new HashSet<>();
+ continueStates.add("Pending");
+ continueStates.add("Running");
+ boolean completed = pollingCheck(statusQueryGetUri, "Completed", continueStates, Duration.ofSeconds(15));
+ assertTrue(completed, "Orchestration should complete after rewind");
+
+ // Verify the output contains the expected result
+ Response statusResponse = get(statusQueryGetUri);
+ String output = statusResponse.jsonPath().get("output");
+ assertTrue(output.contains("rewound-success"), "Output should indicate successful rewind: " + output);
+ }
+
@Test
public void externalEventDeserializeFail() throws InterruptedException {
String startOrchestrationPath = "api/ExternalEventHttp";
diff --git a/internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH b/internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH
index 3d3f9e98..fdb90d6a 100644
--- a/internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH
+++ b/internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH
@@ -1 +1 @@
-fbe5bb20835678099fc51a44993ed9b045dee5a6
\ No newline at end of file
+026329c53fe6363985655857b9ca848ec7238bd2
\ No newline at end of file
diff --git a/internal/durabletask-protobuf/protos/orchestrator_service.proto b/internal/durabletask-protobuf/protos/orchestrator_service.proto
index 88928c3b..8ef46a4a 100644
--- a/internal/durabletask-protobuf/protos/orchestrator_service.proto
+++ b/internal/durabletask-protobuf/protos/orchestrator_service.proto
@@ -41,6 +41,7 @@ message TaskFailureDetails {
google.protobuf.StringValue stackTrace = 3;
TaskFailureDetails innerFailure = 4;
bool isNonRetriable = 5;
+ map properties = 6;
}
enum OrchestrationStatus {
@@ -95,6 +96,7 @@ message TaskScheduledEvent {
google.protobuf.StringValue version = 2;
google.protobuf.StringValue input = 3;
TraceContext parentTraceContext = 4;
+ map tags = 5;
}
message TaskCompletedEvent {
@@ -113,6 +115,7 @@ message SubOrchestrationInstanceCreatedEvent {
google.protobuf.StringValue version = 3;
google.protobuf.StringValue input = 4;
TraceContext parentTraceContext = 5;
+ map tags = 6;
}
message SubOrchestrationInstanceCompletedEvent {
@@ -192,7 +195,7 @@ message EntityOperationCalledEvent {
}
message EntityLockRequestedEvent {
- string criticalSectionId = 1;
+ string criticalSectionId = 1;
repeated string lockSet = 2;
int32 position = 3;
google.protobuf.StringValue parentInstanceId = 4; // used only within messages, null in histories
@@ -217,7 +220,19 @@ message EntityUnlockSentEvent {
message EntityLockGrantedEvent {
string criticalSectionId = 1;
}
-
+
+message ExecutionRewoundEvent {
+ google.protobuf.StringValue reason = 1;
+ google.protobuf.StringValue parentExecutionId = 2; // used only for rewinding suborchestrations, null otherwise
+ google.protobuf.StringValue instanceId = 3; // used only for rewinding suborchestrations, null otherwise
+ TraceContext parentTraceContext = 4; // used only for rewinding suborchestrations, null otherwise
+ google.protobuf.StringValue name = 5; // used by DTS backend only
+ google.protobuf.StringValue version = 6; // used by DTS backend only
+ google.protobuf.StringValue input = 7; // used by DTS backend only
+ ParentInstanceInfo parentInstance = 8; // used by DTS backend only
+ map tags = 9; // used by DTS backend only
+}
+
message HistoryEvent {
int32 eventId = 1;
google.protobuf.Timestamp timestamp = 2;
@@ -244,11 +259,12 @@ message HistoryEvent {
ExecutionResumedEvent executionResumed = 22;
EntityOperationSignaledEvent entityOperationSignaled = 23;
EntityOperationCalledEvent entityOperationCalled = 24;
- EntityOperationCompletedEvent entityOperationCompleted = 25;
- EntityOperationFailedEvent entityOperationFailed = 26;
+ EntityOperationCompletedEvent entityOperationCompleted = 25;
+ EntityOperationFailedEvent entityOperationFailed = 26;
EntityLockRequestedEvent entityLockRequested = 27;
EntityLockGrantedEvent entityLockGranted = 28;
EntityUnlockSentEvent entityUnlockSent = 29;
+ ExecutionRewoundEvent executionRewound = 30;
}
}
@@ -256,6 +272,8 @@ message ScheduleTaskAction {
string name = 1;
google.protobuf.StringValue version = 2;
google.protobuf.StringValue input = 3;
+ map tags = 4;
+ TraceContext parentTraceContext = 5;
}
message CreateSubOrchestrationAction {
@@ -263,6 +281,8 @@ message CreateSubOrchestrationAction {
string name = 2;
google.protobuf.StringValue version = 3;
google.protobuf.StringValue input = 4;
+ TraceContext parentTraceContext = 5;
+ map tags = 6;
}
message CreateTimerAction {
@@ -282,6 +302,7 @@ message CompleteOrchestrationAction {
google.protobuf.StringValue newVersion = 4;
repeated HistoryEvent carryoverEvents = 5;
TaskFailureDetails failureDetails = 6;
+ map tags = 7;
}
message TerminateOrchestrationAction {
@@ -312,6 +333,11 @@ message OrchestratorAction {
}
}
+message OrchestrationTraceContext {
+ google.protobuf.StringValue spanID = 1;
+ google.protobuf.Timestamp spanStartTime = 2;
+}
+
message OrchestratorRequest {
string instanceId = 1;
google.protobuf.StringValue executionId = 2;
@@ -320,6 +346,8 @@ message OrchestratorRequest {
OrchestratorEntityParameters entityParameters = 5;
bool requiresHistoryStreaming = 6;
map properties = 7;
+
+ OrchestrationTraceContext orchestrationTraceContext = 8;
}
message OrchestratorResponse {
@@ -331,6 +359,17 @@ message OrchestratorResponse {
// The number of work item events that were processed by the orchestrator.
// This field is optional. If not set, the service should assume that the orchestrator processed all events.
google.protobuf.Int32Value numEventsProcessed = 5;
+ OrchestrationTraceContext orchestrationTraceContext = 6;
+
+ // Whether or not a history is required to complete the original OrchestratorRequest and none was provided.
+ bool requiresHistory = 7;
+
+ // True if this is a partial (chunked) completion. The backend must keep the work item open until the final chunk (isPartial=false).
+ bool isPartial = 8;
+
+ // Zero-based position of the current chunk within a chunked completion sequence.
+ // This field is omitted for non-chunked completions.
+ google.protobuf.Int32Value chunkIndex = 9;
}
message CreateInstanceRequest {
@@ -343,6 +382,7 @@ message CreateInstanceRequest {
google.protobuf.StringValue executionId = 7;
map tags = 8;
TraceContext parentTraceContext = 9;
+ google.protobuf.Timestamp requestTime = 10;
}
message OrchestrationIdReusePolicy {
@@ -449,12 +489,28 @@ message QueryInstancesResponse {
google.protobuf.StringValue continuationToken = 2;
}
+message ListInstanceIdsRequest {
+ repeated OrchestrationStatus runtimeStatus = 1;
+ google.protobuf.Timestamp completedTimeFrom = 2;
+ google.protobuf.Timestamp completedTimeTo = 3;
+ int32 pageSize = 4;
+ google.protobuf.StringValue lastInstanceKey = 5;
+}
+
+message ListInstanceIdsResponse {
+ repeated string instanceIds = 1;
+ google.protobuf.StringValue lastInstanceKey = 2;
+}
+
message PurgeInstancesRequest {
oneof request {
string instanceId = 1;
PurgeInstanceFilter purgeInstanceFilter = 2;
+ InstanceBatch instanceBatch = 4;
}
bool recursive = 3;
+ // used in the case when an instanceId is specified to determine if the purge request is for an orchestration (as opposed to an entity)
+ bool isOrchestration = 5;
}
message PurgeInstanceFilter {
@@ -468,6 +524,15 @@ message PurgeInstancesResponse {
google.protobuf.BoolValue isComplete = 2;
}
+message RestartInstanceRequest {
+ string instanceId = 1;
+ bool restartWithNewInstanceId = 2;
+}
+
+message RestartInstanceResponse {
+ string instanceId = 1;
+}
+
message CreateTaskHubRequest {
bool recreateIfExists = 1;
}
@@ -490,10 +555,12 @@ message SignalEntityRequest {
google.protobuf.StringValue input = 3;
string requestId = 4;
google.protobuf.Timestamp scheduledTime = 5;
+ TraceContext parentTraceContext = 6;
+ google.protobuf.Timestamp requestTime = 7;
}
message SignalEntityResponse {
- // no payload
+ // no payload
}
message GetEntityRequest {
@@ -553,6 +620,7 @@ message EntityBatchRequest {
string instanceId = 1;
google.protobuf.StringValue entityState = 2;
repeated OperationRequest operations = 3;
+ map properties = 4;
}
message EntityBatchResult {
@@ -562,6 +630,8 @@ message EntityBatchResult {
TaskFailureDetails failureDetails = 4;
string completionToken = 5;
repeated OperationInfo operationInfos = 6; // used only with DTS
+ // Whether or not an entity state is required to complete the original EntityBatchRequest and none was provided.
+ bool requiresState = 7;
}
message EntityRequest {
@@ -575,6 +645,7 @@ message OperationRequest {
string operation = 1;
string requestId = 2;
google.protobuf.StringValue input = 3;
+ TraceContext traceContext = 4;
}
message OperationResult {
@@ -591,10 +662,14 @@ message OperationInfo {
message OperationResultSuccess {
google.protobuf.StringValue result = 1;
+ google.protobuf.Timestamp startTimeUtc = 2;
+ google.protobuf.Timestamp endTimeUtc = 3;
}
message OperationResultFailure {
TaskFailureDetails failureDetails = 1;
+ google.protobuf.Timestamp startTimeUtc = 2;
+ google.protobuf.Timestamp endTimeUtc = 3;
}
message OperationAction {
@@ -610,6 +685,8 @@ message SendSignalAction {
string name = 2;
google.protobuf.StringValue input = 3;
google.protobuf.Timestamp scheduledTime = 4;
+ google.protobuf.Timestamp requestTime = 5;
+ TraceContext parentTraceContext = 6;
}
message StartNewOrchestrationAction {
@@ -618,6 +695,8 @@ message StartNewOrchestrationAction {
google.protobuf.StringValue version = 3;
google.protobuf.StringValue input = 4;
google.protobuf.Timestamp scheduledTime = 5;
+ google.protobuf.Timestamp requestTime = 6;
+ TraceContext parentTraceContext = 7;
}
message AbandonActivityTaskRequest {
@@ -644,6 +723,17 @@ message AbandonEntityTaskResponse {
// Empty.
}
+message SkipGracefulOrchestrationTerminationsRequest {
+ InstanceBatch instanceBatch = 1;
+ google.protobuf.StringValue reason = 2;
+}
+
+message SkipGracefulOrchestrationTerminationsResponse {
+ // Those instances which could not be terminated because they had locked entities at the time of this termination call,
+ // are already in a terminal state (completed, failed, terminated, etc.), are not orchestrations, or do not exist (i.e. have been purged)
+ repeated string unterminatedInstanceIds = 1;
+}
+
service TaskHubSidecarService {
// Sends a hello request to the sidecar service.
rpc Hello(google.protobuf.Empty) returns (google.protobuf.Empty);
@@ -657,18 +747,21 @@ service TaskHubSidecarService {
// Rewinds an orchestration instance to last known good state and replays from there.
rpc RewindInstance(RewindInstanceRequest) returns (RewindInstanceResponse);
+ // Restarts an orchestration instance.
+ rpc RestartInstance(RestartInstanceRequest) returns (RestartInstanceResponse);
+
// Waits for an orchestration instance to reach a running or completion state.
rpc WaitForInstanceStart(GetInstanceRequest) returns (GetInstanceResponse);
-
+
// Waits for an orchestration instance to reach a completion state (completed, failed, terminated, etc.).
rpc WaitForInstanceCompletion(GetInstanceRequest) returns (GetInstanceResponse);
// Raises an event to a running orchestration instance.
rpc RaiseEvent(RaiseEventRequest) returns (RaiseEventResponse);
-
+
// Terminates a running orchestration instance.
rpc TerminateInstance(TerminateRequest) returns (TerminateResponse);
-
+
// Suspends a running orchestration instance.
rpc SuspendInstance(SuspendRequest) returns (SuspendResponse);
@@ -678,6 +771,9 @@ service TaskHubSidecarService {
// rpc DeleteInstance(DeleteInstanceRequest) returns (DeleteInstanceResponse);
rpc QueryInstances(QueryInstancesRequest) returns (QueryInstancesResponse);
+
+ rpc ListInstanceIds(ListInstanceIdsRequest) returns (ListInstanceIdsResponse);
+
rpc PurgeInstances(PurgeInstancesRequest) returns (PurgeInstancesResponse);
rpc GetWorkItems(GetWorkItemsRequest) returns (stream WorkItem);
@@ -714,6 +810,10 @@ service TaskHubSidecarService {
// Abandon an entity work item
rpc AbandonTaskEntityWorkItem(AbandonEntityTaskRequest) returns (AbandonEntityTaskResponse);
+
+ // "Skip" graceful termination of orchestrations by immediately changing their status in storage to "terminated".
+ // Note that a maximum of 500 orchestrations can be terminated at a time using this method.
+ rpc SkipGracefulOrchestrationTerminations(SkipGracefulOrchestrationTerminationsRequest) returns (SkipGracefulOrchestrationTerminationsResponse);
}
message GetWorkItemsRequest {
@@ -732,6 +832,16 @@ enum WorkerCapability {
// When set, the service may return work items without any history events as an optimization.
// It is strongly recommended that all SDKs support this capability.
WORKER_CAPABILITY_HISTORY_STREAMING = 1;
+
+ // Indicates that the worker supports scheduled tasks.
+ // The service may send schedule-triggered orchestration work items,
+ // and the worker must handle them, including the scheduledTime field.
+ WORKER_CAPABILITY_SCHEDULED_TASKS = 2;
+
+ // Signals that the worker can handle large payloads stored externally (e.g., Blob Storage).
+ // Work items may contain URI references instead of inline data, and the worker must fetch them.
+ // This avoids message size limits and reduces network overhead.
+ WORKER_CAPABILITY_LARGE_PAYLOADS = 3;
}
message WorkItem {
@@ -750,7 +860,7 @@ message CompleteTaskResponse {
}
message HealthPing {
- // No payload
+ // No payload
}
message StreamInstanceHistoryRequest {
@@ -764,3 +874,8 @@ message StreamInstanceHistoryRequest {
message HistoryChunk {
repeated HistoryEvent events = 1;
}
+
+message InstanceBatch {
+ // A maximum of 500 instance IDs can be provided in this list.
+ repeated string instanceIds = 1;
+}
diff --git a/samples/src/main/java/io/durabletask/samples/RewindPattern.java b/samples/src/main/java/io/durabletask/samples/RewindPattern.java
new file mode 100644
index 00000000..7b2b9775
--- /dev/null
+++ b/samples/src/main/java/io/durabletask/samples/RewindPattern.java
@@ -0,0 +1,131 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+package io.durabletask.samples;
+
+import com.microsoft.durabletask.*;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Sample demonstrating the rewind functionality.
+ *
+ * Rewind allows a failed orchestration to be replayed from its last known good state.
+ * This is useful for recovering from transient failures without losing progress.
+ *
+ * This sample:
+ * 1. Starts an orchestration that calls an activity which fails on the first attempt
+ * 2. Waits for the orchestration to fail
+ * 3. Rewinds the orchestration, which replays it from the failure point
+ * 4. The activity succeeds on retry, and the orchestration completes
+ */
+final class RewindPattern {
+
+ // Flag to simulate a transient failure (fails first time, succeeds after)
+ private static final AtomicBoolean shouldFail = new AtomicBoolean(true);
+
+ public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
+ // Create and start the worker
+ final DurableTaskGrpcWorker worker = createTaskHubWorker();
+ worker.start();
+
+ // Create the client
+ final DurableTaskClient client = new DurableTaskGrpcClientBuilder().build();
+
+ try {
+ // Reset the failure flag
+ shouldFail.set(true);
+
+ // Start the orchestration - it will fail on the first activity call
+ String instanceId = client.scheduleNewOrchestrationInstance(
+ "RewindableOrchestration",
+ new NewOrchestrationInstanceOptions().setInput("TestInput"));
+ System.out.printf("Started orchestration instance: %s%n", instanceId);
+
+ // Wait for the orchestration to fail
+ System.out.println("Waiting for orchestration to fail...");
+ OrchestrationMetadata failedInstance = client.waitForInstanceCompletion(
+ instanceId,
+ Duration.ofSeconds(30),
+ true);
+
+ System.out.printf("Orchestration status: %s%n", failedInstance.getRuntimeStatus());
+
+ if (failedInstance.getRuntimeStatus() == OrchestrationRuntimeStatus.FAILED) {
+ System.out.println("Orchestration failed as expected. Now rewinding...");
+
+ // Rewind the failed orchestration
+ client.rewindInstance(instanceId, "Rewinding after transient failure");
+ System.out.println("Rewind request sent.");
+
+ // Wait for the orchestration to complete after rewind
+ System.out.println("Waiting for orchestration to complete after rewind...");
+ OrchestrationMetadata completedInstance = client.waitForInstanceCompletion(
+ instanceId,
+ Duration.ofSeconds(30),
+ true);
+
+ System.out.printf("Orchestration completed: %s%n", completedInstance.getRuntimeStatus());
+ System.out.printf("Output: %s%n", completedInstance.readOutputAs(String.class));
+ } else {
+ System.out.println("Unexpected status: " + failedInstance.getRuntimeStatus());
+ }
+
+ } finally {
+ // Shutdown the worker
+ worker.stop();
+ }
+ }
+
+ private static DurableTaskGrpcWorker createTaskHubWorker() {
+ DurableTaskGrpcWorkerBuilder builder = new DurableTaskGrpcWorkerBuilder();
+
+ // Register the orchestration
+ builder.addOrchestration(new TaskOrchestrationFactory() {
+ @Override
+ public String getName() {
+ return "RewindableOrchestration";
+ }
+
+ @Override
+ public TaskOrchestration create() {
+ return ctx -> {
+ String input = ctx.getInput(String.class);
+
+ // Call an activity that may fail
+ String result = ctx.callActivity("FailOnceActivity", input, String.class).await();
+
+ ctx.complete(result);
+ };
+ }
+ });
+
+ // Register the activity that fails on first call
+ builder.addActivity(new TaskActivityFactory() {
+ @Override
+ public String getName() {
+ return "FailOnceActivity";
+ }
+
+ @Override
+ public TaskActivity create() {
+ return ctx -> {
+ String input = ctx.getInput(String.class);
+
+ // Fail on the first call, succeed on subsequent calls
+ if (shouldFail.compareAndSet(true, false)) {
+ System.out.println("FailOnceActivity: Simulating transient failure...");
+ throw new RuntimeException("Simulated transient failure - rewind to retry");
+ }
+
+ System.out.println("FailOnceActivity: Succeeded after rewind!");
+ return input + "-rewound-success";
+ };
+ }
+ });
+
+ return builder.build();
+ }
+}