diff --git a/src/azure-cli/azure/cli/command_modules/keyvault/_help.py b/src/azure-cli/azure/cli/command_modules/keyvault/_help.py index 525b4c5b159..7fc77f3465a 100644 --- a/src/azure-cli/azure/cli/command_modules/keyvault/_help.py +++ b/src/azure-cli/azure/cli/command_modules/keyvault/_help.py @@ -906,6 +906,20 @@ the secret will be downloaded. This operation requires the secrets/backup permission. """ +helps['keyvault secret copy'] = """ +type: command +short-summary: Copy a secret from one Key Vault to another. +long-summary: Copies the latest version of a secret from a source Key Vault to a destination Key Vault. + This operation copies the secret value and its metadata (tags, content-type, attributes). +examples: + - name: Copy a specific secret from one vault to another. + text: az keyvault secret copy --source-vault SourceVault --destination-vault DestVault --name MySecret + - name: Copy all secrets from one vault to another. + text: az keyvault secret copy --source-vault SourceVault --destination-vault DestVault --all + - name: Copy a secret and overwrite if it already exists in the destination. + text: az keyvault secret copy --source-vault SourceVault --destination-vault DestVault --name MySecret --overwrite +""" + helps['keyvault secret restore'] = """ type: command short-summary: Restores a backed up secret to a vault. diff --git a/src/azure-cli/azure/cli/command_modules/keyvault/_params.py b/src/azure-cli/azure/cli/command_modules/keyvault/_params.py index 5718281f2b4..9b6a05ee807 100644 --- a/src/azure-cli/azure/cli/command_modules/keyvault/_params.py +++ b/src/azure-cli/azure/cli/command_modules/keyvault/_params.py @@ -568,6 +568,20 @@ class CLISecurityDomainOperation(str, Enum): with self.argument_context('keyvault secret restore') as c: c.extra('vault_base_url', vault_name_type, required=True, arg_group='Id', type=get_vault_base_url_type(self.cli_ctx), id_part=None) + + with self.argument_context('keyvault secret copy') as c: + c.extra('vault_base_url', vault_name_type, type=get_vault_base_url_type(self.cli_ctx), + options_list=['--source-vault'], help='Name of the source Key Vault.', required=True) + c.extra('destination_vault', vault_name_type, type=get_vault_base_url_type(self.cli_ctx), + options_list=['--destination-vault'], help='Name of the destination Key Vault.', required=True) + c.argument('name', options_list=['--name', '-n'], + help='Name of the secret to copy. Mutually exclusive with --all. If neither --name nor --all is ' + 'specified, all secrets will be copied.', + required=False) + c.extra('all_secrets', arg_type=get_three_state_flag(), options_list=['--all'], + help='Copy all secrets from the source vault. Mutually exclusive with --name. If neither --name nor ' + '--all is specified, all secrets will be copied.') + c.extra('overwrite', arg_type=get_three_state_flag(), help='Overwrite secrets in the destination vault if they already exist.') # endregion # region keyvault security-domain diff --git a/src/azure-cli/azure/cli/command_modules/keyvault/commands.py b/src/azure-cli/azure/cli/command_modules/keyvault/commands.py index c70ae0b600c..ca5a2085e41 100644 --- a/src/azure-cli/azure/cli/command_modules/keyvault/commands.py +++ b/src/azure-cli/azure/cli/command_modules/keyvault/commands.py @@ -204,6 +204,7 @@ def load_command_table(self, _): g.keyvault_custom('download', 'download_secret') g.keyvault_custom('backup', 'backup_secret') g.keyvault_custom('restore', 'restore_secret', transform=transform_secret_set_attributes) + g.keyvault_custom('copy', 'copy_secret') # certificate track2 with self.command_group('keyvault certificate', data_certificate_entity.command_type) as g: diff --git a/src/azure-cli/azure/cli/command_modules/keyvault/custom.py b/src/azure-cli/azure/cli/command_modules/keyvault/custom.py index 9cc5f7e3916..f94e5a2ff1e 100644 --- a/src/azure-cli/azure/cli/command_modules/keyvault/custom.py +++ b/src/azure-cli/azure/cli/command_modules/keyvault/custom.py @@ -2517,3 +2517,120 @@ def set_attributes_certificate(client, certificate_name, version=None, policy=No if kwargs.get('enabled') is not None or kwargs.get('tags') is not None: return client.update_certificate_properties(certificate_name=certificate_name, version=version, **kwargs) return client.get_certificate(certificate_name=certificate_name) + + +def _copy_single_secret(source_client, dest_client, secret_name, overwrite, is_single_mode): + from azure.core.exceptions import ResourceNotFoundError, HttpResponseError + + try: + # Check destination + if not overwrite: + try: + dest_client.get_secret(secret_name) + logger.warning("Secret '%s' already exists in destination. Skipping.", secret_name) + return None # Skipped + except ResourceNotFoundError: + pass + except HttpResponseError as e: + logger.warning("Error checking secret '%s' in destination: %s", secret_name, str(e)) + return False # Failed + + # Copy + logger.info("Copying secret: %s", secret_name) + s = source_client.get_secret(secret_name) + + try: + new_secret = dest_client.set_secret( + s.name, + s.value, + content_type=s.properties.content_type, + tags=s.properties.tags, + enabled=s.properties.enabled, + not_before=s.properties.not_before, + expires_on=s.properties.expires_on + ) + except HttpResponseError as e: + if is_single_mode: + raise e + + logger.error("Failed to copy secret '%s': %s", secret_name, str(e)) + return False + + logger.info("Successfully copied secret: %s", secret_name) + return {'name': new_secret.name, 'id': new_secret.id} + + except ResourceNotFoundError as e: + if is_single_mode: + raise e + logger.error("Secret '%s' not found in source vault.", secret_name) + return False + except HttpResponseError as e: + if is_single_mode: + raise e + logger.error("Failed to copy secret '%s': %s", secret_name, str(e)) + return False + + +def copy_secret(cmd, client, destination_vault, name=None, all_secrets=None, overwrite=False): + from azure.core.exceptions import ResourceNotFoundError, HttpResponseError + from azure.cli.command_modules.keyvault._client_factory import data_plane_azure_keyvault_secret_client + + # If neither a specific secret name nor --all is provided, default to copying all secrets. + if not name and not all_secrets: + all_secrets = True + + # A specific secret name and --all are mutually exclusive. + if name and all_secrets: + raise MutuallyExclusiveArgumentError("Specify either a secret name or --all, but not both.") + # Validation + if client.vault_url.rstrip('/') == destination_vault.rstrip('/'): + raise CLIError("Source and destination Key Vaults cannot be the same.") + + command_args = {'vault_base_url': destination_vault} + dest_client = data_plane_azure_keyvault_secret_client(cmd.cli_ctx, command_args) + + # Fail fast if source or destination vault is not accessible or does not exist + for c, vault_url in [(client, client.vault_url), (dest_client, destination_vault)]: + try: + # Perform a lightweight call to validate vault accessibility. + # A 404 for a dummy secret name means the vault is reachable but the secret does not exist. + c.get_secret("azure-cli-validation-dummy") + except ResourceNotFoundError: + # Vault is accessible but the dummy secret does not exist, which is expected. + pass + except HttpResponseError as e: + if e.status_code == 404: + # Vault is accessible but the dummy secret does not exist, which is expected. + pass + else: + raise CLIError(f"Failed to access Key Vault '{vault_url}': {str(e)}") + + secrets_to_copy = [] + if name: + secrets_to_copy.append(name) + else: + logger.info("Copying all secrets from source...") + try: + source_secrets = client.list_properties_of_secrets() + for s in source_secrets: + if s.managed: + logger.warning("Skipping managed secret: %s", s.name) + continue + secrets_to_copy.append(s.name) + except HttpResponseError as e: + raise CLIError(f"Failed to list secrets from source: {str(e)}") + + copied_secrets = [] + failed_secrets = [] + for secret_name in secrets_to_copy: + result = _copy_single_secret(client, dest_client, secret_name, overwrite, bool(name)) + if result: + copied_secrets.append(result) + elif result is False: + failed_secrets.append(secret_name) + + if failed_secrets: + logger.warning("Operation completed with failures. %s secrets failed to copy: %s", + len(failed_secrets), ', '.join(failed_secrets)) + + return copied_secrets diff --git a/src/azure-cli/azure/cli/command_modules/keyvault/tests/latest/test_keyvault_commands.py b/src/azure-cli/azure/cli/command_modules/keyvault/tests/latest/test_keyvault_commands.py index a1df39f8ac5..c0669ebfe27 100644 --- a/src/azure-cli/azure/cli/command_modules/keyvault/tests/latest/test_keyvault_commands.py +++ b/src/azure-cli/azure/cli/command_modules/keyvault/tests/latest/test_keyvault_commands.py @@ -18,8 +18,9 @@ from azure.cli.testsdk.scenario_tests import AllowLargeResponse, record_only from azure.cli.testsdk.scenario_tests import RecordingProcessor from azure.cli.testsdk import ResourceGroupPreparer, StorageAccountPreparer, KeyVaultPreparer, ManagedHSMPreparer, ScenarioTest -from azure.core.exceptions import HttpResponseError +from azure.core.exceptions import HttpResponseError, ResourceNotFoundError from knack.util import CLIError +from azure.cli.command_modules.keyvault.custom import copy_secret secret_text_encoding_values = ['utf-8', 'utf-16le', 'utf-16be', 'ascii'] secret_binary_encoding_values = ['base64', 'hex'] @@ -2778,5 +2779,504 @@ def test_keyvault_mhsm_region(self, resource_group, managed_hsm): self.cmd('keyvault region remove -g {rg} --hsm-name {hsm_name} -r uksouth') +class KeyVaultSecretCopyScenarioTest(ScenarioTest): + """Test suite for keyvault secret copy functionality. + + This test class validates the secret copy command across different scenarios including + single secret copy, bulk copy, overwrite behavior, and error handling. + """ + + @AllowLargeResponse() + @ResourceGroupPreparer(name_prefix='cli_test_kv_secret_copy', location='eastus2') + @KeyVaultPreparer(name_prefix='cli-test-kv-src-', location='eastus2', + additional_params='--enable-rbac-authorization false', parameter_name='src_kv') + @KeyVaultPreparer(name_prefix='cli-test-kv-dst-', location='eastus2', + additional_params='--enable-rbac-authorization false', parameter_name='dest_kv') + def test_keyvault_secret_copy_single(self, resource_group, src_kv, dest_kv): + """Test copying a single secret between key vaults.""" + self.kwargs.update({ + 'src_kv': src_kv, + 'dest_kv': dest_kv, + 'secret1': self.create_random_name('secret-', 24), + 'secret_value': 'TestSecretValue123', + 'loc': 'eastus2' + }) + + # Create secret in source vault with metadata + self.cmd('keyvault secret set --vault-name {src_kv} -n {secret1} --value {secret_value} ' + '--tags environment=test owner=cli --content-type text/plain', + checks=[ + self.check('value', '{secret_value}'), + self.check('contentType', 'text/plain'), + self.check('tags.environment', 'test'), + self.check('tags.owner', 'cli') + ]) + + # Copy single secret to destination vault + self.cmd('keyvault secret copy --source-vault {src_kv} --destination-vault {dest_kv} --name {secret1}') + + # Verify secret was copied with all properties + self.cmd('keyvault secret show --vault-name {dest_kv} -n {secret1}', + checks=[ + self.check('value', '{secret_value}'), + self.check('contentType', 'text/plain'), + self.check('tags.environment', 'test'), + self.check('tags.owner', 'cli'), + self.check('name', '{secret1}') + ]) + + # Verify source secret still exists + self.cmd('keyvault secret show --vault-name {src_kv} -n {secret1}', + checks=[ + self.check('value', '{secret_value}'), + self.check('name', '{secret1}') + ]) + + @AllowLargeResponse() + @ResourceGroupPreparer(name_prefix='cli_test_kv_secret_copy', location='eastus2') + @KeyVaultPreparer(name_prefix='cli-test-kv-src-', location='eastus2', + additional_params='--enable-rbac-authorization false', parameter_name='src_kv') + @KeyVaultPreparer(name_prefix='cli-test-kv-dst-', location='eastus2', + additional_params='--enable-rbac-authorization false', parameter_name='dest_kv') + def test_keyvault_secret_copy_all(self, resource_group, src_kv, dest_kv): + """Test copying all secrets from source to destination vault.""" + self.kwargs.update({ + 'src_kv': src_kv, + 'dest_kv': dest_kv, + 'secret1': self.create_random_name('secret1-', 24), + 'secret2': self.create_random_name('secret2-', 24), + 'secret3': self.create_random_name('secret3-', 24), + 'value1': 'Value1', + 'value2': 'Value2', + 'value3': 'Value3', + 'loc': 'eastus2' + }) + + # Create multiple secrets in source vault + self.cmd('keyvault secret set --vault-name {src_kv} -n {secret1} --value {value1}', + checks=self.check('value', '{value1}')) + self.cmd('keyvault secret set --vault-name {src_kv} -n {secret2} --value {value2}', + checks=self.check('value', '{value2}')) + self.cmd('keyvault secret set --vault-name {src_kv} -n {secret3} --value {value3}', + checks=self.check('value', '{value3}')) + + # Copy all secrets to destination + self.cmd('keyvault secret copy --source-vault {src_kv} --destination-vault {dest_kv} --all') + + # Verify all secrets were copied + self.cmd('keyvault secret show --vault-name {dest_kv} -n {secret1}', + checks=self.check('value', '{value1}')) + self.cmd('keyvault secret show --vault-name {dest_kv} -n {secret2}', + checks=self.check('value', '{value2}')) + self.cmd('keyvault secret show --vault-name {dest_kv} -n {secret3}', + checks=self.check('value', '{value3}')) + + # Verify source secrets still exist + self.cmd('keyvault secret list --vault-name {src_kv}', + checks=self.check('length(@)', 3)) + + @AllowLargeResponse() + @ResourceGroupPreparer(name_prefix='cli_test_kv_secret_copy', location='eastus2') + @KeyVaultPreparer(name_prefix='cli-test-kv-src-', location='eastus2', + additional_params='--enable-rbac-authorization false', parameter_name='src_kv') + @KeyVaultPreparer(name_prefix='cli-test-kv-dst-', location='eastus2', + additional_params='--enable-rbac-authorization false', parameter_name='dest_kv') + def test_keyvault_secret_copy_overwrite_behavior(self, resource_group, src_kv, dest_kv): + """Test overwrite behavior when copying secrets that already exist.""" + self.kwargs.update({ + 'src_kv': src_kv, + 'dest_kv': dest_kv, + 'secret': self.create_random_name('secret-', 24), + 'original_value': 'OriginalValue', + 'updated_value': 'UpdatedValue', + 'loc': 'eastus2' + }) + + # Create secret in source vault + self.cmd('keyvault secret set --vault-name {src_kv} -n {secret} --value {original_value}') + + # Copy secret to destination + self.cmd('keyvault secret copy --source-vault {src_kv} --destination-vault {dest_kv} --name {secret}') + + # Verify secret was copied + self.cmd('keyvault secret show --vault-name {dest_kv} -n {secret}', + checks=self.check('value', '{original_value}')) + + # Update secret in source vault + self.cmd('keyvault secret set --vault-name {src_kv} -n {secret} --value {updated_value}') + + # Copy again without overwrite flag (should skip) + self.cmd('keyvault secret copy --source-vault {src_kv} --destination-vault {dest_kv} --name {secret}') + + # Verify destination still has original value + self.cmd('keyvault secret show --vault-name {dest_kv} -n {secret}', + checks=self.check('value', '{original_value}')) + + # Copy with overwrite flag (should update) + self.cmd('keyvault secret copy --source-vault {src_kv} --destination-vault {dest_kv} ' + '--name {secret} --overwrite') + + # Verify destination now has updated value + self.cmd('keyvault secret show --vault-name {dest_kv} -n {secret}', + checks=self.check('value', '{updated_value}')) + + @AllowLargeResponse() + @ResourceGroupPreparer(name_prefix='cli_test_kv_secret_copy', location='eastus2') + @KeyVaultPreparer(name_prefix='cli-test-kv-src-', location='eastus2', + additional_params='--enable-rbac-authorization false', parameter_name='src_kv') + @KeyVaultPreparer(name_prefix='cli-test-kv-dst-', location='eastus2', + additional_params='--enable-rbac-authorization false', parameter_name='dest_kv') + def test_keyvault_secret_copy_error_cases(self, resource_group, src_kv, dest_kv): + """Test error handling for invalid copy operations.""" + self.kwargs.update({ + 'src_kv': src_kv, + 'dest_kv': dest_kv, + 'nonexistent_kv': 'nonexistent-kv-' + self.create_random_name('', 10), + 'secret': self.create_random_name('secret-', 24), + 'nonexistent_secret': 'nonexistent-secret-' + self.create_random_name('', 10), + 'secret_value': 'TestValue', + 'loc': 'eastus2' + }) + + # Create a secret + self.cmd('keyvault secret set --vault-name {src_kv} -n {secret} --value {secret_value}') + + # Test 1: Copy to non-existent destination vault + # In playback mode, accessing a non-existent vault triggers an unknown host or similar network error, + # but the command should still fail handled. We run this to expect failure. + self.cmd('keyvault secret copy --source-vault {src_kv} --destination-vault {nonexistent_kv} ' + '--name {secret}', + expect_failure=True) + + # Test 2: Copy non-existent secret (should fail) + self.cmd('keyvault secret copy --source-vault {src_kv} --destination-vault {dest_kv} ' + '--name {nonexistent_secret}', + expect_failure=True) + + # Test 2.5: Copy from non-existent source vault (should fail) + self.cmd('keyvault secret copy --source-vault {nonexistent_kv} --destination-vault {dest_kv} ' + '--name {secret}', + expect_failure=True) + + # Test 3: Source and destination are the same (should fail) + self.cmd('keyvault secret copy --source-vault {src_kv} --destination-vault {src_kv} ' + '--name {secret}', + expect_failure=True) + + # Test 4: Using both --name and --all flags (should fail due to mutual exclusivity) + self.cmd('keyvault secret copy --source-vault {src_kv} --destination-vault {dest_kv} ' + '--name {secret} --all', + expect_failure=True) + + @AllowLargeResponse() + @ResourceGroupPreparer(name_prefix='cli_test_kv_secret_copy', location='eastus2') + @KeyVaultPreparer(name_prefix='cli-test-kv-src-', location='eastus2', + additional_params='--enable-rbac-authorization false', parameter_name='src_kv') + @KeyVaultPreparer(name_prefix='cli-test-kv-dst-', location='eastus2', + additional_params='--enable-rbac-authorization false', parameter_name='dest_kv') + def test_keyvault_secret_copy_default_behavior(self, resource_group, src_kv, dest_kv): + """Test default behavior when neither --name nor --all is specified.""" + self.kwargs.update({ + 'src_kv': src_kv, + 'dest_kv': dest_kv, + 'secret1': self.create_random_name('secret1-', 24), + 'secret2': self.create_random_name('secret2-', 24), + 'value1': 'DefaultValue1', + 'value2': 'DefaultValue2', + 'loc': 'eastus2' + }) + + # Create secrets in source vault + self.cmd('keyvault secret set --vault-name {src_kv} -n {secret1} --value {value1}') + self.cmd('keyvault secret set --vault-name {src_kv} -n {secret2} --value {value2}') + + # Copy without specifying --name or --all (should default to --all) + self.cmd('keyvault secret copy --source-vault {src_kv} --destination-vault {dest_kv}') + + # Verify all secrets were copied + self.cmd('keyvault secret show --vault-name {dest_kv} -n {secret1}', + checks=self.check('value', '{value1}')) + self.cmd('keyvault secret show --vault-name {dest_kv} -n {secret2}', + checks=self.check('value', '{value2}')) + + +class KeyVaultSecretCopyUnitTest(unittest.TestCase): + """Unit tests for the copy_secret function with mocked dependencies.""" + + def setUp(self): + """Set up test fixtures and mocks.""" + # Create mock command context + self.cmd = mock.MagicMock() + self.cmd.cli_ctx = mock.MagicMock() + self.cmd.cli_ctx.data = { + 'subscription_id': 'test-subscription-id', + 'headers': {}, + 'completer_active': False, + 'command': 'keyvault secret copy' + } + + # Mock data_plane_azure_keyvault_secret_client + self.patcher_secret_client = mock.patch('azure.cli.command_modules.keyvault._client_factory.data_plane_azure_keyvault_secret_client') + self.mock_secret_client_factory = self.patcher_secret_client.start() + + # Create source and destination client mocks + self.source_client = mock.MagicMock() + self.source_client.vault_url = "https://source-vault.vault.azure.net/" + + self.dest_client = mock.MagicMock() + self.dest_client.vault_url = "https://destination-vault.vault.azure.net/" + self.mock_secret_client_factory.return_value = self.dest_client + + def tearDown(self): + """Clean up mocks.""" + self.patcher_secret_client.stop() + + def _create_mock_secret(self, name, value, content_type=None, tags=None, enabled=True, + not_before=None, expires_on=None): + """Helper method to create a mock secret object.""" + secret = mock.Mock() + secret.name = name + secret.value = value + secret.properties = mock.Mock() + secret.properties.content_type = content_type + secret.properties.tags = tags or {} + secret.properties.enabled = enabled + secret.properties.not_before = not_before + secret.properties.expires_on = expires_on + secret.properties.managed = False + return secret + + def test_copy_single_secret_success(self): + """Test successful copy of a single secret.""" + secret_name = "test-secret" + destination_vault = "https://destination-vault.vault.azure.net/" + + # Mock destination vault connectivity check + not_found_error = HttpResponseError(message="Not Found") + not_found_error.status_code = 404 + + # Mock sequence: connectivity check (404), then existence check (not found) + self.dest_client.get_secret.side_effect = [ + not_found_error, + ResourceNotFoundError("Secret not found") + ] + self.source_client.get_secret.side_effect = [ + not_found_error, + self._create_mock_secret( + name=secret_name, + value="secret-value-123", + content_type="application/json", + tags={"env": "test", "version": "1.0"} + ) + ] + + # Mock successful set operation + dest_secret = self._create_mock_secret( + name=secret_name, + value="secret-value-123" + ) + dest_secret.id = f"{destination_vault}/secrets/{secret_name}" + self.dest_client.set_secret.return_value = dest_secret + + # Execute copy operation + result = copy_secret(self.cmd, self.source_client, destination_vault, name=secret_name) + + # Verify results + self.assertEqual(len(result), 1) + self.assertEqual(result[0]['name'], secret_name) + self.assertIn('id', result[0]) + + # Verify set_secret was called with correct parameters + self.dest_client.set_secret.assert_called_once() + call_args = self.dest_client.set_secret.call_args + self.assertEqual(call_args[0][0], secret_name) + self.assertEqual(call_args[0][1], "secret-value-123") + self.assertEqual(call_args[1]['content_type'], "application/json") + self.assertEqual(call_args[1]['tags'], {"env": "test", "version": "1.0"}) + + def test_copy_secret_already_exists_skip(self): + """Test that existing secrets are skipped when overwrite is False.""" + secret_name = "existing-secret" + destination_vault = "https://destination-vault.vault.azure.net/" + + # Mock destination vault connectivity check + not_found_error = HttpResponseError(message="Not Found") + not_found_error.status_code = 404 + + # Mock sequence: connectivity check (404), then existence check (found) + existing_secret = self._create_mock_secret(name=secret_name, value="existing-value") + self.dest_client.get_secret.side_effect = [not_found_error, existing_secret] + self.source_client.get_secret.side_effect = [not_found_error, existing_secret] + + # Execute copy operation without overwrite + result = copy_secret( + self.cmd, + self.source_client, + destination_vault, + name=secret_name, + overwrite=False + ) + + # Verify secret was skipped + self.assertEqual(len(result), 0) + self.dest_client.set_secret.assert_not_called() + + def test_copy_secret_overwrite_enabled(self): + """Test that existing secrets are overwritten when overwrite is True.""" + secret_name = "overwrite-secret" + destination_vault = "https://destination-vault.vault.azure.net/" + + # Mock destination vault connectivity check only (skip existence check) + not_found_error = HttpResponseError(message="Not Found") + not_found_error.status_code = 404 + self.dest_client.get_secret.side_effect = [not_found_error] + + self.source_client.get_secret.side_effect = [ + not_found_error, + self._create_mock_secret( + name=secret_name, + value="new-overwrite-value", + tags={"updated": "true"} + ) + ] + + # Mock successful set operation + dest_secret = self._create_mock_secret(name=secret_name, value="new-overwrite-value") + dest_secret.id = f"{destination_vault}/secrets/{secret_name}" + self.dest_client.set_secret.return_value = dest_secret + + # Execute copy operation with overwrite + result = copy_secret( + self.cmd, + self.source_client, + destination_vault, + name=secret_name, + overwrite=True + ) + + # Verify secret was overwritten + self.assertEqual(len(result), 1) + self.dest_client.set_secret.assert_called_once() + + def test_copy_all_secrets_success(self): + """Test copying all secrets from source to destination.""" + destination_vault = "https://destination-vault.vault.azure.net/" + + # Mock destination vault connectivity check + not_found_error = HttpResponseError(message="Not Found") + not_found_error.status_code = 404 + + # Mock list of secrets in source vault + secret_props_1 = mock.Mock() + secret_props_1.name = "secret-one" + secret_props_1.managed = False + + secret_props_2 = mock.Mock() + secret_props_2.name = "secret-two" + secret_props_2.managed = False + + secret_props_3 = mock.Mock() + secret_props_3.name = "managed-secret" + secret_props_3.managed = True # This should be skipped + + self.source_client.list_properties_of_secrets.return_value = [ + secret_props_1, + secret_props_2, + secret_props_3 + ] + + # Mock destination vault checks (connectivity + existence for each secret) + self.dest_client.get_secret.side_effect = [ + not_found_error, # Connectivity check + ResourceNotFoundError("Not found"), # secret-one doesn't exist + ResourceNotFoundError("Not found") # secret-two doesn't exist + ] + + # Mock get_secret for source secrets + def get_secret_side_effect(name): + return self._create_mock_secret(name=name, value=f"value-for-{name}") + + self.source_client.get_secret.side_effect = get_secret_side_effect + + # Mock set_secret responses + def set_secret_side_effect(name, value, **kwargs): + secret = self._create_mock_secret(name=name, value=value) + secret.id = f"{destination_vault}/secrets/{name}" + return secret + + self.dest_client.set_secret.side_effect = set_secret_side_effect + + # Execute copy all operation + result = copy_secret(self.cmd, self.source_client, destination_vault, all_secrets=True) + + # Verify only non-managed secrets were copied + self.assertEqual(len(result), 2) + copied_names = {r['name'] for r in result} + self.assertEqual(copied_names, {"secret-one", "secret-two"}) + + # Verify set_secret was called twice (not for managed secret) + self.assertEqual(self.dest_client.set_secret.call_count, 2) + + def test_copy_all_secrets_with_some_existing(self): + """Test copying all secrets when some already exist in destination.""" + destination_vault = "https://destination-vault.vault.azure.net/" + + # Mock destination vault connectivity check + not_found_error = HttpResponseError(message="Not Found") + not_found_error.status_code = 404 + + # Mock list of secrets in source vault + secret_props_1 = mock.Mock() + secret_props_1.name = "secret-new" + secret_props_1.managed = False + + secret_props_2 = mock.Mock() + secret_props_2.name = "secret-existing" + secret_props_2.managed = False + + self.source_client.list_properties_of_secrets.return_value = [ + secret_props_1, + secret_props_2 + ] + + # Mock destination vault checks + existing_secret = self._create_mock_secret(name="secret-existing", value="old-value") + self.dest_client.get_secret.side_effect = [ + not_found_error, # Connectivity check + ResourceNotFoundError("Not found"), # secret-new doesn't exist + existing_secret # secret-existing exists + ] + + # Mock get_secret for source secrets + def get_secret_side_effect(name): + return self._create_mock_secret(name=name, value=f"value-for-{name}") + + self.source_client.get_secret.side_effect = get_secret_side_effect + + # Mock set_secret response + def set_secret_side_effect(name, value, **kwargs): + secret = self._create_mock_secret(name=name, value=value) + secret.id = f"{destination_vault}/secrets/{name}" + return secret + + self.dest_client.set_secret.side_effect = set_secret_side_effect + + # Execute copy all operation without overwrite + result = copy_secret( + self.cmd, + self.source_client, + destination_vault, + all_secrets=True, + overwrite=False + ) + + # Verify only the new secret was copied + self.assertEqual(len(result), 1) + self.assertEqual(result[0]['name'], "secret-new") + + # Verify set_secret was called only once + self.assertEqual(self.dest_client.set_secret.call_count, 1) + + if __name__ == '__main__': unittest.main()