diff --git a/cmd/vsphere/main.go b/cmd/vsphere/main.go index 23f6bc06a..66b6147f6 100644 --- a/cmd/vsphere/main.go +++ b/cmd/vsphere/main.go @@ -165,6 +165,7 @@ func main() { // Create a taskIDCache for create task IDs in case they are lost due to // network error or stale cache. taskIDCache := make(map[string]string) + failedProvStatusUpdate := make(map[string]*machinev1.VSphereMachineProviderStatus) // Initialize machine actuator. machineActuator := machine.NewActuator(machine.ActuatorParams{ @@ -172,6 +173,7 @@ func main() { APIReader: mgr.GetAPIReader(), EventRecorder: mgr.GetEventRecorderFor("vspherecontroller"), TaskIDCache: taskIDCache, + FailedProvStatusUpdate: failedProvStatusUpdate, FeatureGates: defaultMutableGate, OpenshiftConfigNamespace: vsphere.OpenshiftConfigNamespace, }) diff --git a/pkg/controller/vsphere/actuator.go b/pkg/controller/vsphere/actuator.go index 56def7a6f..339db1c62 100644 --- a/pkg/controller/vsphere/actuator.go +++ b/pkg/controller/vsphere/actuator.go @@ -33,6 +33,7 @@ type Actuator struct { apiReader runtimeclient.Reader eventRecorder record.EventRecorder TaskIDCache map[string]string + FailedProvStatusUpdate map[string]*machinev1.VSphereMachineProviderStatus FeatureGates featuregate.MutableFeatureGate openshiftConfigNamespace string } @@ -43,6 +44,7 @@ type ActuatorParams struct { APIReader runtimeclient.Reader EventRecorder record.EventRecorder TaskIDCache map[string]string + FailedProvStatusUpdate map[string]*machinev1.VSphereMachineProviderStatus FeatureGates featuregate.MutableFeatureGate OpenshiftConfigNamespace string } @@ -54,6 +56,7 @@ func NewActuator(params ActuatorParams) *Actuator { apiReader: params.APIReader, eventRecorder: params.EventRecorder, TaskIDCache: params.TaskIDCache, + FailedProvStatusUpdate: params.FailedProvStatusUpdate, FeatureGates: params.FeatureGates, openshiftConfigNamespace: params.OpenshiftConfigNamespace, } @@ -90,8 +93,20 @@ func (a *Actuator) Create(ctx context.Context, machine *machinev1.Machine) error // This is a workaround for a cache race condition. if val, ok := a.TaskIDCache[machine.Name]; ok { if val != scope.providerStatus.TaskRef { - klog.Errorf("%s: machine object missing expected provider task ID, requeue", machine.GetName()) - return &machinecontroller.RequeueAfterError{RequeueAfter: requeueAfterSeconds * time.Second} + if a.FailedProvStatusUpdate[machine.Name] != nil { + // Attempt to update previous status + klog.Infof("Attempting to re-patch machine %s", machine.Name) + scope.providerStatus = a.FailedProvStatusUpdate[machine.Name] + if err := scope.PatchMachine(); err != nil { + // Still not having any luck. Return the error and retry later. + return err + } + // Update worked. Clear out the failed patch info. + delete(a.FailedProvStatusUpdate, machine.Name) + } else { + klog.Errorf("%s: machine object missing expected provider task ID, requeue", machine.GetName()) + return &machinecontroller.RequeueAfterError{RequeueAfter: requeueAfterSeconds * time.Second} + } } } @@ -109,6 +124,8 @@ func (a *Actuator) Create(ctx context.Context, machine *machinev1.Machine) error } if err := scope.PatchMachine(); err != nil { + // An error occurred while saving status fields. Save off and try again later. + a.FailedProvStatusUpdate[scope.machine.Name] = scope.providerStatus return err } @@ -135,6 +152,7 @@ func (a *Actuator) Update(ctx context.Context, machine *machinev1.Machine) error klog.Infof("%s: actuator updating machine", machine.GetName()) // Cleanup TaskIDCache so we don't continually grow delete(a.TaskIDCache, machine.Name) + delete(a.FailedProvStatusUpdate, machine.Name) scope, err := newMachineScope(machineScopeParams{ Context: ctx, @@ -177,6 +195,7 @@ func (a *Actuator) Delete(ctx context.Context, machine *machinev1.Machine) error // Cleanup TaskIDCache so we don't continually grow // Cleanup here as well in case Update() was never successfully called. delete(a.TaskIDCache, machine.Name) + delete(a.FailedProvStatusUpdate, machine.Name) scope, err := newMachineScope(machineScopeParams{ Context: ctx, diff --git a/pkg/controller/vsphere/actuator_test.go b/pkg/controller/vsphere/actuator_test.go index 8193dad59..f6ead510b 100644 --- a/pkg/controller/vsphere/actuator_test.go +++ b/pkg/controller/vsphere/actuator_test.go @@ -24,6 +24,9 @@ import ( "sigs.k8s.io/controller-runtime/pkg/envtest" "sigs.k8s.io/controller-runtime/pkg/manager" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" + + machinecontroller "github.com/openshift/machine-api-operator/pkg/controller/machine" + apimachineryerrors "k8s.io/apimachinery/pkg/api/errors" ) func init() { @@ -371,11 +374,13 @@ func TestMachineEvents(t *testing.T) { } taskIDCache := make(map[string]string) + failedProvStatusUpdate := make(map[string]*machinev1.VSphereMachineProviderStatus) params := ActuatorParams{ Client: k8sClient, EventRecorder: eventRecorder, APIReader: k8sClient, TaskIDCache: taskIDCache, + FailedProvStatusUpdate: failedProvStatusUpdate, OpenshiftConfigNamespace: openshiftConfigNamespaceForTest, FeatureGates: gate, } @@ -412,3 +417,483 @@ func TestMachineEvents(t *testing.T) { }) } } + +func TestFailedProvStatusRetry(t *testing.T) { + g := NewWithT(t) + + model, session, server := initSimulator(t) + defer model.Remove() + defer server.Close() + + host, port, err := net.SplitHostPort(server.URL.Host) + if err != nil { + t.Fatal(err) + } + + credentialsSecretUsername := fmt.Sprintf("%s.username", host) + credentialsSecretPassword := fmt.Sprintf("%s.password", host) + password, _ := server.URL.User.Password() + + vm := model.Map().Any("VirtualMachine").(*simulator.VirtualMachine) + vm.Config.Version = minimumHWVersionString + + testEnv := &envtest.Environment{ + CRDDirectoryPaths: []string{ + filepath.Join("..", "..", "..", "install"), + filepath.Join("..", "..", "..", "vendor", "github.com", "openshift", "api", "config", "v1", "zz_generated.crd-manifests"), + filepath.Join("..", "..", "..", "third_party", "cluster-api", "crd")}, + } + + cfg, err := testEnv.Start() + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(cfg).ToNot(BeNil()) + defer func() { + g.Expect(testEnv.Stop()).To(Succeed()) + }() + + mgr, err := manager.New(cfg, manager.Options{ + Scheme: scheme.Scheme, + Metrics: metricsserver.Options{ + BindAddress: "0", + }, + }) + if err != nil { + t.Fatal(err) + } + + mgrCtx, cancel := context.WithCancel(context.Background()) + go func() { + g.Expect(mgr.Start(mgrCtx)).To(Succeed()) + }() + defer cancel() + + k8sClient := mgr.GetClient() + eventRecorder := mgr.GetEventRecorderFor("vspherecontroller") + + configNamespace := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: openshiftConfigNamespaceForTest, + }, + } + g.Expect(k8sClient.Create(context.Background(), configNamespace)).To(Succeed()) + defer func() { + g.Expect(k8sClient.Delete(context.Background(), configNamespace)).To(Succeed()) + }() + + testNamespaceName := "test" + testNamespace := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: testNamespaceName, + }, + } + g.Expect(k8sClient.Create(context.Background(), testNamespace)).To(Succeed()) + defer func() { + g.Expect(k8sClient.Delete(context.Background(), testNamespace)).To(Succeed()) + }() + + credentialsSecretName := "test" + credentialsSecret := corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: credentialsSecretName, + Namespace: testNamespaceName, + }, + Data: map[string][]byte{ + credentialsSecretUsername: []byte(server.URL.User.Username()), + credentialsSecretPassword: []byte(password), + }, + } + g.Expect(k8sClient.Create(context.Background(), &credentialsSecret)).To(Succeed()) + defer func() { + g.Expect(k8sClient.Delete(context.Background(), &credentialsSecret)).To(Succeed()) + }() + + testConfig := fmt.Sprintf(testConfigFmt, port, credentialsSecretName, testNamespaceName) + configMap := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testname", + Namespace: openshiftConfigNamespaceForTest, + }, + Data: map[string]string{ + "testkey": testConfig, + }, + } + g.Expect(k8sClient.Create(context.Background(), configMap)).To(Succeed()) + defer func() { + g.Expect(k8sClient.Delete(context.Background(), configMap)).To(Succeed()) + }() + + infra := &configv1.Infrastructure{ + ObjectMeta: metav1.ObjectMeta{ + Name: globalInfrastuctureName, + }, + Spec: configv1.InfrastructureSpec{ + CloudConfig: configv1.ConfigMapFileReference{ + Name: "testname", + Key: "testkey", + }, + }, + } + g.Expect(k8sClient.Create(context.Background(), infra)).To(Succeed()) + defer func() { + g.Expect(k8sClient.Delete(context.Background(), infra)).To(Succeed()) + }() + + userDataSecretName := "vsphere-ignition" + userDataSecret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: userDataSecretName, + Namespace: testNamespaceName, + }, + Data: map[string][]byte{ + userDataSecretKey: []byte("{}"), + }, + } + g.Expect(k8sClient.Create(context.Background(), userDataSecret)).To(Succeed()) + defer func() { + g.Expect(k8sClient.Delete(context.Background(), userDataSecret)).To(Succeed()) + }() + + _, err = createTagAndCategory(session, tagToCategoryName("CLUSTERID"), "CLUSTERID") + g.Expect(err).ToNot(HaveOccurred()) + + ctx := context.Background() + timeout := 10 * time.Second + + providerSpec, err := RawExtensionFromProviderSpec(&machinev1.VSphereMachineProviderSpec{ + Template: vm.Name, + Workspace: &machinev1.Workspace{ + Server: host, + }, + CredentialsSecret: &corev1.LocalObjectReference{ + Name: "test", + }, + UserDataSecret: &corev1.LocalObjectReference{ + Name: userDataSecretName, + }, + DiskGiB: 10, + }) + g.Expect(err).ToNot(HaveOccurred()) + + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + Labels: map[string]string{ + machinev1.MachineClusterIDLabel: "CLUSTERID", + }, + }, + Spec: corev1.NodeSpec{}, + Status: corev1.NodeStatus{ + VolumesAttached: []corev1.AttachedVolume{}, + }, + } + g.Expect(k8sClient.Create(ctx, node)).To(Succeed()) + defer func() { + g.Expect(k8sClient.Delete(ctx, node)).To(Succeed()) + }() + getNode := func() error { + nodeKey := types.NamespacedName{Name: node.Name} + return k8sClient.Get(ctx, nodeKey, &corev1.Node{}) + } + g.Eventually(getNode, timeout).Should(Succeed()) + + gate, err := testutils.NewDefaultMutableFeatureGate() + if err != nil { + t.Fatal(err) + } + + t.Run("PatchMachine failure saves provider status and retry succeeds", func(t *testing.T) { + gs := NewWithT(t) + + machineName := "test-retry-success" + machine := &machinev1.Machine{ + ObjectMeta: metav1.ObjectMeta{ + Name: machineName, + Namespace: testNamespaceName, + Labels: map[string]string{ + machinev1.MachineClusterIDLabel: "CLUSTERID", + }, + }, + Spec: machinev1.MachineSpec{ + ProviderSpec: machinev1.ProviderSpec{ + Value: providerSpec, + }, + }, + Status: machinev1.MachineStatus{ + NodeRef: &corev1.ObjectReference{ + Name: "test", + }, + }, + } + + gs.Expect(k8sClient.Create(ctx, machine)).To(Succeed()) + defer func() { + // Clean up if machine still exists + if err := k8sClient.Delete(ctx, machine); err != nil && !apimachineryerrors.IsNotFound(err) { + t.Logf("cleanup: %v", err) + } + }() + + getMachine := func() error { + machineKey := types.NamespacedName{Namespace: machine.Namespace, Name: machine.Name} + return k8sClient.Get(ctx, machineKey, &machinev1.Machine{}) + } + gs.Eventually(getMachine, timeout).Should(Succeed()) + + taskIDCache := make(map[string]string) + failedProvStatusUpdate := make(map[string]*machinev1.VSphereMachineProviderStatus) + params := ActuatorParams{ + Client: k8sClient, + EventRecorder: eventRecorder, + APIReader: k8sClient, + TaskIDCache: taskIDCache, + FailedProvStatusUpdate: failedProvStatusUpdate, + OpenshiftConfigNamespace: openshiftConfigNamespaceForTest, + FeatureGates: gate, + } + actuator := NewActuator(params) + + // Step 1: Create successfully - the cache should be populated with the task ID. + err := actuator.Create(ctx, machine) + gs.Expect(err).ToNot(HaveOccurred()) + gs.Expect(taskIDCache).To(HaveKey(machineName)) + cachedTaskID := taskIDCache[machineName] + gs.Expect(cachedTaskID).ToNot(BeEmpty()) + + // Step 2: Simulate a PatchMachine failure by deleting the machine from the API, + // then calling Create again. The in-memory machine still has the old TaskRef, + // so the reconciler runs and sets a new TaskRef. But PatchMachine fails because + // the machine no longer exists in the API. + gs.Expect(k8sClient.Delete(ctx, machine)).To(Succeed()) + gs.Eventually(func() bool { + machineKey := types.NamespacedName{Namespace: machine.Namespace, Name: machine.Name} + return k8sClient.Get(ctx, machineKey, &machinev1.Machine{}) != nil + }, timeout).Should(BeTrue()) + + err = actuator.Create(ctx, machine) + gs.Expect(err).To(HaveOccurred()) + + // Verify the failed provider status was cached for retry. + gs.Expect(failedProvStatusUpdate).To(HaveKey(machineName)) + gs.Expect(failedProvStatusUpdate[machineName]).ToNot(BeNil()) + gs.Expect(failedProvStatusUpdate[machineName].TaskRef).ToNot(BeEmpty()) + + // Step 3: Re-create the machine in the API so PatchMachine can succeed on retry. + // Reset the in-memory machine to have a stale (empty) TaskRef so + // the stale check triggers the retry path. + newMachine := &machinev1.Machine{ + ObjectMeta: metav1.ObjectMeta{ + Name: machineName, + Namespace: testNamespaceName, + Labels: map[string]string{ + machinev1.MachineClusterIDLabel: "CLUSTERID", + }, + }, + Spec: machinev1.MachineSpec{ + ProviderSpec: machinev1.ProviderSpec{ + Value: providerSpec, + }, + }, + Status: machinev1.MachineStatus{ + NodeRef: &corev1.ObjectReference{ + Name: "test", + }, + }, + } + gs.Expect(k8sClient.Create(ctx, newMachine)).To(Succeed()) + gs.Eventually(func() error { + machineKey := types.NamespacedName{Namespace: newMachine.Namespace, Name: newMachine.Name} + return k8sClient.Get(ctx, machineKey, &machinev1.Machine{}) + }, timeout).Should(Succeed()) + + // Call Create with the new machine (which has empty TaskRef, mismatching cache). + // The retry logic should re-patch the saved status and clear FailedProvStatusUpdate. + err = actuator.Create(ctx, newMachine) + gs.Expect(err).ToNot(HaveOccurred()) + gs.Expect(failedProvStatusUpdate).ToNot(HaveKey(machineName)) + }) + + t.Run("Stale TaskRef without saved status returns requeue error", func(t *testing.T) { + gs := NewWithT(t) + + machineName := "test-stale-requeue" + machine := &machinev1.Machine{ + ObjectMeta: metav1.ObjectMeta{ + Name: machineName, + Namespace: testNamespaceName, + Labels: map[string]string{ + machinev1.MachineClusterIDLabel: "CLUSTERID", + }, + }, + Spec: machinev1.MachineSpec{ + ProviderSpec: machinev1.ProviderSpec{ + Value: providerSpec, + }, + }, + Status: machinev1.MachineStatus{ + NodeRef: &corev1.ObjectReference{ + Name: "test", + }, + }, + } + + gs.Expect(k8sClient.Create(ctx, machine)).To(Succeed()) + defer func() { + if err := k8sClient.Delete(ctx, machine); err != nil && !apimachineryerrors.IsNotFound(err) { + t.Logf("cleanup: %v", err) + } + }() + + getMachine := func() error { + machineKey := types.NamespacedName{Namespace: machine.Namespace, Name: machine.Name} + return k8sClient.Get(ctx, machineKey, &machinev1.Machine{}) + } + gs.Eventually(getMachine, timeout).Should(Succeed()) + + // Pre-populate TaskIDCache with a value that won't match the machine's empty TaskRef. + taskIDCache := map[string]string{ + machineName: "some-old-task-id", + } + failedProvStatusUpdate := make(map[string]*machinev1.VSphereMachineProviderStatus) + params := ActuatorParams{ + Client: k8sClient, + EventRecorder: eventRecorder, + APIReader: k8sClient, + TaskIDCache: taskIDCache, + FailedProvStatusUpdate: failedProvStatusUpdate, + OpenshiftConfigNamespace: openshiftConfigNamespaceForTest, + FeatureGates: gate, + } + actuator := NewActuator(params) + + // No FailedProvStatusUpdate entry, so this should return a RequeueAfterError. + err := actuator.Create(ctx, machine) + gs.Expect(err).To(HaveOccurred()) + + _, isRequeue := err.(*machinecontroller.RequeueAfterError) + gs.Expect(isRequeue).To(BeTrue()) + }) + + t.Run("Update cleans up FailedProvStatusUpdate", func(t *testing.T) { + gs := NewWithT(t) + + machineName := "test-update-cleanup" + machine := &machinev1.Machine{ + ObjectMeta: metav1.ObjectMeta{ + Name: machineName, + Namespace: testNamespaceName, + Labels: map[string]string{ + machinev1.MachineClusterIDLabel: "CLUSTERID", + }, + }, + Spec: machinev1.MachineSpec{ + ProviderSpec: machinev1.ProviderSpec{ + Value: providerSpec, + }, + }, + Status: machinev1.MachineStatus{ + NodeRef: &corev1.ObjectReference{ + Name: "test", + }, + }, + } + + gs.Expect(k8sClient.Create(ctx, machine)).To(Succeed()) + defer func() { + if err := k8sClient.Delete(ctx, machine); err != nil && !apimachineryerrors.IsNotFound(err) { + t.Logf("cleanup: %v", err) + } + }() + + getMachine := func() error { + machineKey := types.NamespacedName{Namespace: machine.Namespace, Name: machine.Name} + return k8sClient.Get(ctx, machineKey, &machinev1.Machine{}) + } + gs.Eventually(getMachine, timeout).Should(Succeed()) + + taskIDCache := map[string]string{ + machineName: "some-task-id", + } + failedProvStatusUpdate := map[string]*machinev1.VSphereMachineProviderStatus{ + machineName: {TaskRef: "some-task-id"}, + } + params := ActuatorParams{ + Client: k8sClient, + EventRecorder: eventRecorder, + APIReader: k8sClient, + TaskIDCache: taskIDCache, + FailedProvStatusUpdate: failedProvStatusUpdate, + OpenshiftConfigNamespace: openshiftConfigNamespaceForTest, + FeatureGates: gate, + } + actuator := NewActuator(params) + + err := actuator.Update(ctx, machine) + gs.Expect(err).ToNot(HaveOccurred()) + + gs.Expect(failedProvStatusUpdate).ToNot(HaveKey(machineName)) + gs.Expect(taskIDCache).ToNot(HaveKey(machineName)) + }) + + t.Run("Delete cleans up FailedProvStatusUpdate", func(t *testing.T) { + gs := NewWithT(t) + + machineName := "test-delete-cleanup" + machine := &machinev1.Machine{ + ObjectMeta: metav1.ObjectMeta{ + Name: machineName, + Namespace: testNamespaceName, + Labels: map[string]string{ + machinev1.MachineClusterIDLabel: "CLUSTERID", + }, + }, + Spec: machinev1.MachineSpec{ + ProviderSpec: machinev1.ProviderSpec{ + Value: providerSpec, + }, + }, + Status: machinev1.MachineStatus{ + NodeRef: &corev1.ObjectReference{ + Name: "test", + }, + }, + } + + gs.Expect(k8sClient.Create(ctx, machine)).To(Succeed()) + defer func() { + if err := k8sClient.Delete(ctx, machine); err != nil && !apimachineryerrors.IsNotFound(err) { + t.Logf("cleanup: %v", err) + } + }() + + getMachine := func() error { + machineKey := types.NamespacedName{Namespace: machine.Namespace, Name: machine.Name} + return k8sClient.Get(ctx, machineKey, &machinev1.Machine{}) + } + gs.Eventually(getMachine, timeout).Should(Succeed()) + + taskIDCache := map[string]string{ + machineName: "some-task-id", + } + failedProvStatusUpdate := map[string]*machinev1.VSphereMachineProviderStatus{ + machineName: {TaskRef: "some-task-id"}, + } + params := ActuatorParams{ + Client: k8sClient, + EventRecorder: eventRecorder, + APIReader: k8sClient, + TaskIDCache: taskIDCache, + FailedProvStatusUpdate: failedProvStatusUpdate, + OpenshiftConfigNamespace: openshiftConfigNamespaceForTest, + FeatureGates: gate, + } + actuator := NewActuator(params) + + // Delete will fail (vm doesn't exist in vsphere sim for this machine) + // but it should still clean up the caches before proceeding. + _ = actuator.Delete(ctx, machine) + + gs.Expect(failedProvStatusUpdate).ToNot(HaveKey(machineName)) + gs.Expect(taskIDCache).ToNot(HaveKey(machineName)) + }) +}