Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions src/azure-cli/azure/cli/command_modules/vm/_help.py
Original file line number Diff line number Diff line change
Expand Up @@ -987,6 +987,23 @@
text: az vm list-sizes -l westus
"""

helps['vm cp'] = """
type: command
short-summary: Copy files to and from a virtual machine.
long-summary: >
This command uses an Azure Storage blob container as an intermediary bridge to transfer files.
It requires that the target VM is running and accessible via 'az vm run-command'. On Linux VMs, 'curl'
must be installed and available on the PATH. The identity used to run this command must have sufficient
permissions both on the VM (to execute run-command) and on the storage account/container (to read and write blobs).
examples:
- name: Upload a local file to a VM.
text: az vm cp --source /path/to/local/file --destination my-rg:my-vm:/path/to/remote/file
- name: Download a file from a VM to local.
text: az vm cp --source my-rg:my-vm:/path/to/remote/file --destination /path/to/local/file
- name: Upload a local file to a VM using a specific storage account.
text: az vm cp --source /path/to/local/file --destination my-vm:/path/to/remote/file --storage-account mystorageaccount
"""

helps['vm availability-set create'] = """
type: command
short-summary: Create an Azure Availability Set.
Expand Down
6 changes: 6 additions & 0 deletions src/azure-cli/azure/cli/command_modules/vm/_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,12 @@ def load_arguments(self, _):
with self.argument_context(scope) as c:
c.ignore('include_user_data')

with self.argument_context('vm cp') as c:
c.argument('source', help='The source path. Use [resource-group:]vm-name:path for VM files, or a local path.')
c.argument('destination', help='The destination path. Use [resource-group:]vm-name:path for VM files, or a local path.')
c.argument('storage_account', help='The name or ID of the storage account to use as a bridge.')
c.argument('container_name', help='The name of the container to use in the storage account. Default: azvmcp')

with self.argument_context('vm diagnostics') as c:
c.argument('vm_name', arg_type=existing_vm_name, options_list=['--vm-name'])

Expand Down
1 change: 1 addition & 0 deletions src/azure-cli/azure/cli/command_modules/vm/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ def load_command_table(self, _):
g.wait_command('wait', getter_name='get_instance_view', getter_type=compute_custom)
g.custom_command('auto-shutdown', 'auto_shutdown_vm')
g.custom_command('list-sizes', 'list_vm_sizes', deprecate_info=g.deprecate(redirect='az vm list-skus'))
g.custom_command('cp', 'vm_cp')

from .operations.vm import VMCapture
self.command_table['vm capture'] = VMCapture(loader=self)
Expand Down
219 changes: 216 additions & 3 deletions src/azure-cli/azure/cli/command_modules/vm/custom.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
# pylint: disable=protected-access
import json
import os
import uuid
from datetime import datetime, timedelta

import requests

Expand All @@ -22,6 +24,7 @@

from knack.log import get_logger
from knack.util import CLIError

from azure.cli.core.azclierror import (
ResourceNotFoundError,
ValidationError,
Expand Down Expand Up @@ -109,7 +112,6 @@ def _construct_identity_info(identity_scope, identity_role, implicit_identity, e

# for injecting test seams to produce predicatable role assignment id for playback
def _gen_guid():
import uuid
return uuid.uuid4()


Expand Down Expand Up @@ -3425,8 +3427,7 @@ def attach_unmanaged_data_disk(cmd, resource_group_name, vm_name, new=False, vhd

vm = get_vm_to_update(cmd, resource_group_name, vm_name)
if disk_name is None:
import datetime
disk_name = vm_name + '-' + datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
disk_name = vm_name + '-' + datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
# pylint: disable=no-member
if vhd_uri is None:
if not hasattr(vm.storage_profile.os_disk, 'vhd') or not vm.storage_profile.os_disk.vhd:
Expand Down Expand Up @@ -6536,3 +6537,215 @@ def list_vm_sizes(cmd, location):


# endRegion

def _is_windows_absolute_path(path):
"""
Detect a Windows-style absolute path such as 'C:\\path\\to\\file' or 'D:/file.txt'.
This is a purely syntactic check and does not touch the filesystem.
"""
return (
isinstance(path, str) and
len(path) >= 3 and
path[0].isalpha() and
path[1] == ':' and
path[2] in ('\\', '/')
)


def _parse_vm_file_path(path):
if ':' not in path or _is_windows_absolute_path(path):
return None

first_colon = path.find(':')
# second_colon - find the next colon after the first one
second_colon = path.find(':', first_colon + 1)

if second_colon == -1:
# vm-name:path
vm_name = path[:first_colon]
vm_path = path[first_colon + 1:]
if vm_name and vm_path:
return None, vm_name, vm_path
else:
# Check if it's vm:C:\path (second colon is a drive letter)
if second_colon - first_colon == 2 and path[first_colon + 1].isalpha():
vm_name = path[:first_colon]
vm_path = path[first_colon + 1:]
if vm_name and vm_path:
return None, vm_name, vm_path

# rg:vm-name:path...
rg_name = path[:first_colon]
vm_name = path[first_colon + 1:second_colon]
vm_path = path[second_colon + 1:]
if rg_name and vm_name and vm_path:
return rg_name, vm_name, vm_path

return None


def _get_vm_and_rg(cmd, vm_name, rg=None):
client = _compute_client_factory(cmd.cli_ctx)
if rg:
vm = client.virtual_machines.get(rg, vm_name)
return vm, rg

# Search for VM across all RGs
vms = client.virtual_machines.list_all()
vm = next((v for v in vms if v.name.lower() == vm_name.lower()), None)
if not vm:
raise ResourceNotFoundError("VM '{}' not found.".format(vm_name))
# parse RG from ID
rg = vm.id.split('/')[4]
return vm, rg


def vm_cp(cmd, source, destination, storage_account=None, container_name='azvmcp'):
import shlex
from azure.core.exceptions import ResourceExistsError
from azure.cli.command_modules.storage.operations.blob import upload_blob, download_blob
from azure.cli.command_modules.storage.util import create_short_lived_blob_sas_v2
from azure.cli.command_modules.storage._client_factory import cf_sa, cf_sa_for_keys, cf_blob_service
from .aaz.latest.vm.run_command import Invoke

source_vm = _parse_vm_file_path(source)
dest_vm = _parse_vm_file_path(destination)

if source_vm and dest_vm:
raise ValidationError("Both source and destination cannot be VM paths.")
if not source_vm and not dest_vm:
raise ValidationError("Either source or destination must be a VM path (format: [rg:]vm:path).")

# 1. Prepare Storage Account
if not storage_account:
# Try to find a storage account in the VM's resource group
rg_provided = (source_vm[0] if source_vm else dest_vm[0])
vm_name = (source_vm[1] if source_vm else dest_vm[1])

_, rg = _get_vm_and_rg(cmd, vm_name, rg_provided)

sa_client = cf_sa(cmd.cli_ctx, None)
accounts = list(sa_client.list())
# Filter by RG if possible
rg_accounts = [a for a in accounts if a.id.split('/')[4].lower() == rg.lower()]
if rg_accounts:
storage_account = rg_accounts[0].name
elif accounts:
storage_account = accounts[0].name
else:
raise RequiredArgumentMissingError("No storage account found in the subscription. Please provide one with --storage-account.")

# Get account key
sa_keys_client = cf_sa_for_keys(cmd.cli_ctx, None)
# Check if storage_account is name or ID
if '/' in storage_account:
sa_rg = storage_account.split('/')[4]
sa_name = storage_account.split('/')[-1]
else:
# Search for it
sa_client = cf_sa(cmd.cli_ctx, None)
accounts = list(sa_client.list())
account = next((a for a in accounts if a.name.lower() == storage_account.lower()), None)
if not account:
raise ResourceNotFoundError("Storage account '{}' not found.".format(storage_account))
sa_rg = account.id.split('/')[4]
sa_name = account.name

keys = sa_keys_client.list_keys(sa_rg, sa_name).keys
account_key = keys[0].value

# Ensure container exists
blob_service_client = cf_blob_service(cmd.cli_ctx, {'account_name': sa_name, 'account_key': account_key})
container_client = blob_service_client.get_container_client(container_name)
try:
container_client.create_container()
Comment on lines +6658 to +6661
Copy link

Copilot AI Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The bare except clause catches all exceptions silently, which can hide important errors like authentication failures, network issues, or permission problems. This makes debugging difficult for users. Change this to catch specific exceptions like ResourceExistsError and let other exceptions propagate with helpful error messages.

Copilot uses AI. Check for mistakes.
except ResourceExistsError:
pass

blob_name = str(uuid.uuid4())
blob_client = container_client.get_blob_client(blob_name)

# Common VM info
vm_info = dest_vm if dest_vm else source_vm
rg_provided, vm_name, vm_path = vm_info
vm_obj, rg = _get_vm_and_rg(cmd, vm_name, rg_provided)
is_linux = vm_obj.storage_profile.os_disk.os_type.lower() == 'linux'

try:
if dest_vm:
# UPLOAD: Local -> VM
logger.info("Uploading local file to bridge storage...")
upload_blob(cmd, blob_client, file_path=source)

# Get SAS with READ permission (2 hours expiry)
sas_token = create_short_lived_blob_sas_v2(cmd, sa_name, container_name, blob_name, account_key=account_key)
else:
Copy link

Copilot AI Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Security vulnerability: The PowerShell script includes user-provided vm_path without proper escaping. If vm_path contains single quotes or other PowerShell special characters, it could enable command injection. Use proper PowerShell escaping before constructing the script.

Suggested change
else:
# Escape single quotes in vm_path for safe use inside a single-quoted PowerShell string
escaped_vm_path = vm_path.replace("'", "''") if vm_path is not None else vm_path
script = "$body = Get-Content -Path '{}' -Encoding Byte; Invoke-RestMethod -Uri '{}' -Method Put -Headers @{{'x-ms-blob-type'='BlockBlob'}} -Body $body".format(escaped_vm_path, blob_url)

Copilot uses AI. Check for mistakes.
Copy link

Copilot AI Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: The PowerShell command uses Get-Content with -Encoding Byte, but this parameter was deprecated in PowerShell Core and removed in PowerShell 7. For Windows VMs running PowerShell 7, this will fail. Use -AsByteStream instead for PowerShell Core/7 compatibility, or detect the PowerShell version and use the appropriate parameter.

Suggested change
else:
script = "$body = if ($PSVersionTable.PSVersion.Major -ge 6) {{ Get-Content -Path '{}' -AsByteStream }} else {{ Get-Content -Path '{}' -Encoding Byte }}; Invoke-RestMethod -Uri '{}' -Method Put -Headers @{{'x-ms-blob-type'='BlockBlob'}} -Body $body".format(vm_path, vm_path, blob_url)

Copilot uses AI. Check for mistakes.
# DOWNLOAD: VM -> Local
# Get SAS with WRITE permission (2 hours expiry)
t_sas = cmd.get_models('_shared_access_signature#BlobSharedAccessSignature',
resource_type=ResourceType.DATA_STORAGE_BLOB)
t_blob_permissions = cmd.get_models('_models#BlobSasPermissions', resource_type=ResourceType.DATA_STORAGE_BLOB)
expiry = (datetime.utcnow() + timedelta(hours=2)).strftime('%Y-%m-%dT%H:%M:%SZ')
sas = t_sas(sa_name, account_key=account_key)
sas_token = sas.generate_blob(container_name, blob_name,
permission=t_blob_permissions(write=True),
expiry=expiry, protocol='https')

storage_suffix = getattr(cmd.cli_ctx.cloud.suffixes, 'storage_endpoint', None)
if not storage_suffix:
raise CLIError("The storage endpoint suffix for the current cloud is not configured.")
# Normalize to avoid leading dots or slashes that would break the URL host
storage_suffix = str(storage_suffix).lstrip("./").strip()

blob_url = "https://{}.blob.{}/{}/{}?{}".format(
sa_name, storage_suffix, container_name, blob_name, sas_token)

if dest_vm:
# Script for VM to download from blob
if is_linux:
script = "curl -L -f -s -S -o {} {}".format(shlex.quote(vm_path), shlex.quote(blob_url))
command_id = 'RunShellScript'
else:
escaped_vm_path = vm_path.replace("'", "''")
escaped_blob_url = blob_url.replace("'", "''")
script = "Invoke-WebRequest -Uri '{}' -OutFile '{}'".format(escaped_blob_url, escaped_vm_path)
command_id = 'RunPowerShellScript'
else:
# Script for VM to upload to blob
if is_linux:
script = "curl -X PUT -T {} -H 'x-ms-blob-type: BlockBlob' {}".format(shlex.quote(vm_path), shlex.quote(blob_url))
command_id = 'RunShellScript'
else:
escaped_vm_path = vm_path.replace("'", "''")
escaped_blob_url = blob_url.replace("'", "''")
# Cross-version compatible Get-Content as suggested by Copilot
script = ("$body = if ($PSVersionTable.PSVersion.Major -ge 6) {{ Get-Content -Path '{path}' -AsByteStream }} "
"else {{ Get-Content -Path '{path}' -Encoding Byte }}; "
"Invoke-RestMethod -Uri '{url}' -Method Put -Headers @{{'x-ms-blob-type'='BlockBlob'}} -Body $body").format(
path=escaped_vm_path, url=escaped_blob_url)
command_id = 'RunPowerShellScript'

logger.info("Executing transfer script in VM...")
result = Invoke(cli_ctx=cmd.cli_ctx)(command_args={
'resource_group': rg,
'vm_name': vm_name,
'command_id': command_id,
'script': [script]
})

if result.get('value') and result['value'][0].get('message'):
message = result['value'][0]['message']
if 'failed' in message.lower() or 'error' in message.lower():
raise CLIError("VM execution failed: {}".format(message))
if not dest_vm:
logger.info("Downloading from bridge storage to local...")
download_blob(blob_client, file_path=destination)
finally:
# Cleanup bridge storage
try:
logger.info("Cleaning up bridge storage...")
blob_client.delete_blob()
except Exception: # pylint: disable=broad-except
pass

Comment on lines +6585 to +6750
Copy link

Copilot AI Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing test coverage: There are no tests for the new vm_cp command. The vm module has extensive test files (test_vm_commands.py has 13,881 lines), indicating that comprehensive testing is a convention in this codebase. The vm_cp command involves complex logic with multiple error paths and should have unit tests covering various scenarios including: path parsing edge cases, VM/storage account discovery, upload/download flows, error handling, and cleanup behavior.

Copilot uses AI. Check for mistakes.
return {"message": "File transfer successful."}
Loading