From 8f826993183f6ee10bd9c7e64fe7c6b5c3a21ea1 Mon Sep 17 00:00:00 2001 From: Jvst Me Date: Tue, 3 Feb 2026 20:30:12 +0100 Subject: [PATCH] Add job in-place update event --- .../_internal/server/background/tasks/process_runs.py | 7 +++++++ .../server/background/tasks/test_process_runs.py | 10 ++++++++++ 2 files changed, 17 insertions(+) diff --git a/src/dstack/_internal/server/background/tasks/process_runs.py b/src/dstack/_internal/server/background/tasks/process_runs.py index ee907e519..e9421e5cb 100644 --- a/src/dstack/_internal/server/background/tasks/process_runs.py +++ b/src/dstack/_internal/server/background/tasks/process_runs.py @@ -30,6 +30,7 @@ RunModel, UserModel, ) +from dstack._internal.server.services import events from dstack._internal.server.services.jobs import ( find_job, get_job_specs_from_run_spec, @@ -661,6 +662,12 @@ async def _update_jobs_to_new_deployment_in_place( if can_update_all_jobs: for job_model in job_models: job_model.deployment_num = run_model.deployment_num + events.emit( + session, + f"Job updated. Deployment: {job_model.deployment_num}", + actor=events.SystemActor(), + targets=[events.Target.from_model(job_model)], + ) async def _should_retry_job( diff --git a/src/tests/_internal/server/background/tasks/test_process_runs.py b/src/tests/_internal/server/background/tasks/test_process_runs.py index 46aaa9b48..b9420d8e9 100644 --- a/src/tests/_internal/server/background/tasks/test_process_runs.py +++ b/src/tests/_internal/server/background/tasks/test_process_runs.py @@ -35,6 +35,7 @@ create_user, get_job_provisioning_data, get_run_spec, + list_events, ) from dstack._internal.utils import common @@ -666,6 +667,10 @@ async def test_updates_deployment_num_in_place( assert run.jobs[1].status == job_statuses[1] assert run.jobs[1].replica_num == 1 assert run.jobs[1].deployment_num == 1 # updated + events = await list_events(session) + assert len(events) == 2 + assert events[0].message == "Job updated. Deployment: 1" + assert events[1].message == "Job updated. Deployment: 1" async def test_not_updates_deployment_num_in_place_for_finished_replica( self, test_db, session: AsyncSession @@ -697,6 +702,11 @@ async def test_not_updates_deployment_num_in_place_for_finished_replica( assert run.jobs[1].status == JobStatus.TERMINATED assert run.jobs[1].replica_num == 1 assert run.jobs[1].deployment_num == 0 # not updated + events = await list_events(session) + assert len(events) == 1 + assert events[0].message == "Job updated. Deployment: 1" + assert len(events[0].targets) == 1 + assert events[0].targets[0].entity_id == run.jobs[0].id async def test_starts_new_replica(self, test_db, session: AsyncSession) -> None: run = await make_run(session, status=RunStatus.RUNNING, replicas=2, image="old")