From 079948f083848837487057461dc5cb5f29876a84 Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Fri, 23 Jan 2026 15:34:42 +0000 Subject: [PATCH 01/11] lowercase job names when versioning --- lib/lightning/workflow_versions.ex | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/lightning/workflow_versions.ex b/lib/lightning/workflow_versions.ex index 52f0021299..d7c06cd186 100644 --- a/lib/lightning/workflow_versions.ex +++ b/lib/lightning/workflow_versions.ex @@ -378,6 +378,8 @@ defmodule Lightning.WorkflowVersions do edges_hash_list ]) + IO.puts("Hash input string: #{joined_data}") + :crypto.hash(:sha256, joined_data) |> Base.encode16(case: :lower) |> binary_part(0, 12) From 27f39fcf7f2ebbe50550a26c9df17c1ea7884a0b Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Fri, 23 Jan 2026 17:24:54 +0000 Subject: [PATCH 02/11] changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e6004dd3b7..c53e20216f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -93,6 +93,8 @@ and this project adheres to - Standardise copy button feedback across collaborative editor [#3578](https://github.com/OpenFn/lightning/issues/3578) +- Modified version hashing algorithm for CLI compatibility + [#4346](https://github.com/OpenFn/lightning/issues/4346) ### Fixed From fc4151159575be2c30854e3cd1694202a62a3265 Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Thu, 29 Jan 2026 09:40:46 +0000 Subject: [PATCH 03/11] remove log --- lib/lightning/workflow_versions.ex | 2 -- 1 file changed, 2 deletions(-) diff --git a/lib/lightning/workflow_versions.ex b/lib/lightning/workflow_versions.ex index d7c06cd186..52f0021299 100644 --- a/lib/lightning/workflow_versions.ex +++ b/lib/lightning/workflow_versions.ex @@ -378,8 +378,6 @@ defmodule Lightning.WorkflowVersions do edges_hash_list ]) - IO.puts("Hash input string: #{joined_data}") - :crypto.hash(:sha256, joined_data) |> Base.encode16(case: :lower) |> binary_part(0, 12) From d252cdb7e15e016ff1f4363cf9d46fedf37df85e Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Sun, 1 Feb 2026 16:37:55 +0000 Subject: [PATCH 04/11] drop version_history cache on the Workflow table --- lib/lightning/workflow_versions.ex | 147 ++++-------------- lib/lightning/workflows/workflow.ex | 1 - lib/lightning/workflows/workflow_version.ex | 6 +- .../controllers/api/provisioning_json.ex | 2 +- ..._remove_version_history_from_workflows.exs | 30 ++++ 5 files changed, 62 insertions(+), 124 deletions(-) create mode 100644 priv/repo/migrations/20260201163115_remove_version_history_from_workflows.exs diff --git a/lib/lightning/workflow_versions.ex b/lib/lightning/workflow_versions.ex index 52f0021299..c77d60e0b7 100644 --- a/lib/lightning/workflow_versions.ex +++ b/lib/lightning/workflow_versions.ex @@ -2,24 +2,20 @@ defmodule Lightning.WorkflowVersions do @moduledoc """ Provenance + comparison helpers for workflow heads. - - Persists append-only rows in `workflow_versions` and maintains a materialized - `workflows.version_history` array (12-char lowercase hex). - - `record_version/3` and `record_versions/3` are **idempotent** (`ON CONFLICT DO NOTHING`) - and **concurrency-safe** (row lock, append without dupes). - - `history_for/1` and `latest_hash/1` read the array first; when empty they fall back - to the table with deterministic ordering by `(inserted_at, id)`. - - `reconcile_history!/1` rebuilds the array from provenance rows. + - Persists append-only rows in `workflow_versions` with deterministic ordering + by `(inserted_at, id)`. + - `record_version/3` is **idempotent** and **concurrency-safe** (squashes + consecutive versions with the same source). + - `history_for/1` and `latest_hash/1` query the table with deterministic ordering. - `classify/2` and `classify_with_delta/2` compare two histories (same/ahead/diverged). Validation & invariants: - - `hash` must match `^[a-f0-9]{12}$`; `source` must be `"app"` or `"cli"`; - `(workflow_id, hash)` is unique. + - `hash` must match `^[a-f0-9]{12}$`; `source` must be `"app"` or `"cli"`. - Designed for fast diffs and consistent “latest head” lookups. + Designed for fast diffs and consistent "latest head" lookups. """ import Ecto.Query - alias Ecto.Changeset alias Ecto.Multi alias Lightning.Repo alias Lightning.Validators.Hex @@ -31,13 +27,12 @@ defmodule Lightning.WorkflowVersions do @sources ~w(app cli) @doc """ - Records a **single** workflow head `hash` with provenance and keeps - `workflows.version_history` in sync. + Records a **single** workflow head `hash` with provenance. This operation is **idempotent** and **concurrency-safe**: - it inserts into `workflow_versions` with `ON CONFLICT DO NOTHING`, then - locks the workflow row (`FOR UPDATE`) and appends `hash` to the array only - if it is not already present. + - If the latest version has the same source, it squashes (replaces) it + - If the hash+source already exists, it does nothing + - Otherwise, it inserts a new row ## Parameters * `workflow` — the workflow owning the history @@ -45,14 +40,13 @@ defmodule Lightning.WorkflowVersions do * `source` — `"app"` or `"cli"` (defaults to `"app"`) ## Returns - * `{:ok, %Workflow{}}` — workflow (possibly unchanged) with an updated - `version_history` if a new `hash` was appended + * `{:ok, %Workflow{}}` — workflow (unchanged) * `{:error, reason}` — database error ## Examples iex> WorkflowVersions.record_version(wf, "deadbeefcafe", "app") - {:ok, %Workflow{version_history: [..., "deadbeefcafe"]}} + {:ok, %Workflow{}} iex> WorkflowVersions.record_version(wf, "NOT_HEX", "app") {:error, :invalid_input} @@ -89,10 +83,9 @@ defmodule Lightning.WorkflowVersions do }) ) |> maybe_delete_current_latest() - |> update_workflow_history(workflow) |> Repo.transaction() |> case do - {:ok, %{update_workflow: updated}} -> {:ok, updated} + {:ok, _} -> {:ok, workflow} {:error, _op, reason, _} -> {:error, reason} end else @@ -128,69 +121,17 @@ defmodule Lightning.WorkflowVersions do ) end - defp update_workflow_history(multi, workflow) do - Multi.run( - multi, - :update_workflow, - fn repo, %{new_version: new_version, delete_latest: deleted_version} -> - workflow = - from(w in Workflow, where: w.id == ^workflow.id, lock: "FOR UPDATE") - |> repo.one!() - - workflow - |> Changeset.change( - version_history: - build_version_history( - workflow.version_history || [], - new_version, - deleted_version - ) - ) - |> repo.update() - end - ) - end - - defp build_version_history(history, %{} = new_version, deleted_version) do - version_string = format_version(new_version) - hist = maybe_remove_squashed_version(history, deleted_version) - hist ++ [version_string] - end - - defp build_version_history(history, nil, _deleted), do: history - - defp maybe_remove_squashed_version(history, %{} = deleted_version) do - deleted_string = format_version(deleted_version) - - case List.last(history) do - ^deleted_string -> List.delete_at(history, -1) - _ -> history - end - end - - defp maybe_remove_squashed_version(history, nil), do: history - - defp format_version(%{source: source, hash: hash}), do: "#{source}:#{hash}" - @doc """ Returns the **ordered** history of heads for a workflow. - If `workflow.version_history` is present and non-empty, that array is returned. - Otherwise, the function falls back to `workflow_versions` ordered by - `inserted_at ASC, id ASC` to provide deterministic ordering for equal timestamps. + Queries `workflow_versions` ordered by `inserted_at ASC, id ASC` to provide + deterministic ordering for equal timestamps. ## Examples - iex> WorkflowVersions.history_for(%Workflow{version_history: ["a", "b"]}) - ["a", "b"] - - iex> WorkflowVersions.history_for(wf) # when array is empty/nil - ["a", "b", "c"] + iex> WorkflowVersions.history_for(wf) + ["app:a", "cli:b", "app:c"] """ - def history_for(%Workflow{version_history: arr}) - when is_list(arr) and arr != [], - do: arr - def history_for(%Workflow{id: id}) do from(v in WorkflowVersion, where: v.workflow_id == ^id, @@ -203,33 +144,26 @@ defmodule Lightning.WorkflowVersions do @doc """ Returns the **latest** head for a workflow (or `nil` if none). - Uses `workflow.version_history` when populated (taking the last element). - If empty/nil, reads from `workflow_versions` with - `ORDER BY inserted_at DESC, id DESC LIMIT 1` for deterministic results. + Queries `workflow_versions` with `ORDER BY inserted_at DESC, id DESC LIMIT 1` + for deterministic results. ## Examples - iex> WorkflowVersions.latest_hash(%Workflow{version_history: ["a", "b"]}) - "b" + iex> WorkflowVersions.latest_hash(wf) + "app:b" iex> WorkflowVersions.latest_hash(wf_without_versions) nil """ @spec latest_hash(Workflow.t()) :: hash | nil def latest_hash(%Workflow{} = wf) do - case wf.version_history do - list when is_list(list) and list != [] -> - List.last(list) - - _ -> - from(v in WorkflowVersion, - where: v.workflow_id == ^wf.id, - order_by: [desc: v.inserted_at, desc: v.id], - limit: 1, - select: fragment("? || ':' || ?", v.source, v.hash) - ) - |> Repo.one() - end + from(v in WorkflowVersion, + where: v.workflow_id == ^wf.id, + order_by: [desc: v.inserted_at, desc: v.id], + limit: 1, + select: fragment("? || ':' || ?", v.source, v.hash) + ) + |> Repo.one() end defp latest_version(workflow_id) do @@ -241,29 +175,6 @@ defmodule Lightning.WorkflowVersions do |> Repo.one() end - @doc """ - Rebuilds and **persists** `workflow.version_history` from provenance rows. - - This is useful for maintenance/migrations when the array drifts from the - `workflow_versions` table. Ordering is `inserted_at ASC, id ASC`. - - ## Returns - * `%Workflow{}` — updated workflow with a rebuilt `version_history` - - ## Examples - - iex> wf = WorkflowVersions.reconcile_history!(wf) - %Workflow{version_history: [...]} - """ - @spec reconcile_history!(Workflow.t()) :: Workflow.t() - def reconcile_history!(%Workflow{id: id} = wf) do - arr = history_for(%Workflow{id: id, version_history: []}) - - wf - |> Changeset.change(version_history: arr) - |> Repo.update!() - end - @doc """ Generates a deterministic hash for a workflow based on its structure. diff --git a/lib/lightning/workflows/workflow.ex b/lib/lightning/workflows/workflow.ex index 973faa2190..448bf8c764 100644 --- a/lib/lightning/workflows/workflow.ex +++ b/lib/lightning/workflows/workflow.ex @@ -43,7 +43,6 @@ defmodule Lightning.Workflows.Workflow do field :concurrency, :integer, default: nil field :enable_job_logs, :boolean, default: true field :positions, :map - field :version_history, {:array, :string}, default: [] has_many :edges, Edge, on_replace: :delete_if_exists has_many :jobs, Job, on_replace: :delete diff --git a/lib/lightning/workflows/workflow_version.ex b/lib/lightning/workflows/workflow_version.ex index a68abf415f..52159b9683 100644 --- a/lib/lightning/workflows/workflow_version.ex +++ b/lib/lightning/workflows/workflow_version.ex @@ -5,11 +5,9 @@ defmodule Lightning.Workflows.WorkflowVersion do - One row per head: `hash` (12-char lowercase hex), `source` ("app" | "cli"), `workflow_id`, `inserted_at` (UTC μs). - Append-only: `updated_at` disabled; rows are never mutated. - - Uniqueness: `(workflow_id, hash)` unique; same hash may exist across workflows. - - Validation mirrors DB checks: hash format, allowed sources, valid `workflow_id`. + - Validation: hash format, allowed sources, valid `workflow_id`. - Deterministic ordering via `:utc_datetime_usec` timestamps. - - Use `Lightning.WorkflowVersions` to record/query and keep - `workflows.version_history` in sync. + - Use `Lightning.WorkflowVersions` to record/query workflow versions. """ use Lightning.Schema import Ecto.Changeset diff --git a/lib/lightning_web/controllers/api/provisioning_json.ex b/lib/lightning_web/controllers/api/provisioning_json.ex index 1df123d66e..c373f844cf 100644 --- a/lib/lightning_web/controllers/api/provisioning_json.ex +++ b/lib/lightning_web/controllers/api/provisioning_json.ex @@ -57,7 +57,7 @@ defmodule LightningWeb.API.ProvisioningJSON do workflow_or_snapshot |> Ecto.embedded_dump(:json) |> Map.take( - ~w(id name inserted_at updated_at deleted_at lock_version concurrency version_history)a + ~w(id name inserted_at updated_at deleted_at lock_version concurrency)a ) |> Map.put(:id, workflow_id) |> Map.put( diff --git a/priv/repo/migrations/20260201163115_remove_version_history_from_workflows.exs b/priv/repo/migrations/20260201163115_remove_version_history_from_workflows.exs new file mode 100644 index 0000000000..ad887413cd --- /dev/null +++ b/priv/repo/migrations/20260201163115_remove_version_history_from_workflows.exs @@ -0,0 +1,30 @@ +defmodule Lightning.Repo.Migrations.RemoveVersionHistoryFromWorkflows do + use Ecto.Migration + + def up do + execute """ + ALTER TABLE workflows + DROP CONSTRAINT IF EXISTS workflows_version_history_source_hash_format; + """ + + alter table(:workflows) do + remove :version_history + end + end + + def down do + alter table(:workflows) do + add :version_history, {:array, :string}, default: [], null: false + end + + execute """ + ALTER TABLE workflows + ADD CONSTRAINT workflows_version_history_source_hash_format + CHECK ( + version_history IS NULL OR + array_length(version_history, 1) IS NULL OR + array_to_string(version_history, '|') ~ '^(app|cli):[a-f0-9]{12}(\\|(app|cli):[a-f0-9]{12})*$' + ); + """ + end +end From 0851bf4c4754ff097e75426ffb0ad6605a2569f0 Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Sun, 1 Feb 2026 17:03:20 +0000 Subject: [PATCH 05/11] add migration to drop all workflow histories Since the algo has changed we may as well clean the slate --- .../20260201163115_remove_version_history_from_workflows.exs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/priv/repo/migrations/20260201163115_remove_version_history_from_workflows.exs b/priv/repo/migrations/20260201163115_remove_version_history_from_workflows.exs index ad887413cd..027b199af4 100644 --- a/priv/repo/migrations/20260201163115_remove_version_history_from_workflows.exs +++ b/priv/repo/migrations/20260201163115_remove_version_history_from_workflows.exs @@ -10,6 +10,9 @@ defmodule Lightning.Repo.Migrations.RemoveVersionHistoryFromWorkflows do alter table(:workflows) do remove :version_history end + + # Clean slate: remove all workflow_versions rows since they were out of sync + execute "DELETE FROM workflow_versions" end def down do From 6b267cb49446f82ae21d638d2cb8ff5ef1081df7 Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Sun, 1 Feb 2026 18:10:29 +0000 Subject: [PATCH 06/11] tests --- test/integration/cli_deploy_test.exs | 3 +- test/lightning/workflow_versions_test.exs | 106 ++++++++------------- test/lightning/workflows/workflow_test.exs | 5 - test/lightning/workflows_test.exs | 8 +- 4 files changed, 46 insertions(+), 76 deletions(-) diff --git a/test/integration/cli_deploy_test.exs b/test/integration/cli_deploy_test.exs index 5ccf7a2322..b3193a3202 100644 --- a/test/integration/cli_deploy_test.exs +++ b/test/integration/cli_deploy_test.exs @@ -324,8 +324,7 @@ defmodule Lightning.CliDeployTest do :updated_at, :deleted_at, :lock_version, - :concurrency, - :version_history + :concurrency ]) jobs = diff --git a/test/lightning/workflow_versions_test.exs b/test/lightning/workflow_versions_test.exs index c5c6ba4cd7..5ce7a7506f 100644 --- a/test/lightning/workflow_versions_test.exs +++ b/test/lightning/workflow_versions_test.exs @@ -18,22 +18,31 @@ defmodule Lightning.WorkflowVersionsTest do |> Repo.aggregate(:count, :id) end + defp get_history(workflow_id) do + from(v in WorkflowVersion, + where: v.workflow_id == ^workflow_id, + order_by: [asc: v.inserted_at, asc: v.id], + select: fragment("? || ':' || ?", v.source, v.hash) + ) + |> Repo.all() + end + describe "record_version/3" do - test "inserts a row and appends to workflow.version_history (idempotent)" do + test "inserts a row (idempotent)" do wf = insert(:workflow) assert {:ok, wf1} = WorkflowVersions.record_version(wf, @a, "app") - assert wf1.version_history == ["app:#{@a}"] + assert get_history(wf.id) == ["app:#{@a}"] assert count_rows(wf.id) == 1 # same call again -> still one row; history unchanged assert {:ok, wf2} = WorkflowVersions.record_version(wf1, @a, "app") - assert wf2.version_history == ["app:#{@a}"] + assert get_history(wf.id) == ["app:#{@a}"] assert count_rows(wf.id) == 1 # different hash -> appended assert {:ok, wf3} = WorkflowVersions.record_version(wf2, @b, "cli") - assert wf3.version_history == ["app:#{@a}", "cli:#{@b}"] + assert get_history(wf.id) == ["app:#{@a}", "cli:#{@b}"] assert count_rows(wf.id) == 2 end @@ -52,23 +61,23 @@ defmodule Lightning.WorkflowVersionsTest do # Insert first version assert {:ok, wf1} = WorkflowVersions.record_version(wf, @a, "app") - assert wf1.version_history == ["app:#{@a}"] + assert get_history(wf.id) == ["app:#{@a}"] assert count_rows(wf.id) == 1 # Insert different hash assert {:ok, wf2} = WorkflowVersions.record_version(wf1, @b, "cli") - assert wf2.version_history == ["app:#{@a}", "cli:#{@b}"] + assert get_history(wf.id) == ["app:#{@a}", "cli:#{@b}"] assert count_rows(wf.id) == 2 # Try to insert the same hash and source as latest (duplicate) assert {:ok, wf3} = WorkflowVersions.record_version(wf2, @b, "cli") - assert wf3.version_history == ["app:#{@a}", "cli:#{@b}"] + assert get_history(wf.id) == ["app:#{@a}", "cli:#{@b}"] # Should still be 2, no new row inserted assert count_rows(wf.id) == 2 # Try with different source but same hash - NOT a duplicate, will insert assert {:ok, wf4} = WorkflowVersions.record_version(wf3, @b, "app") - assert wf4.version_history == ["app:#{@a}", "cli:#{@b}", "app:#{@b}"] + assert get_history(wf.id) == ["app:#{@a}", "cli:#{@b}", "app:#{@b}"] # Now we have 3 rows assert count_rows(wf.id) == 3 end @@ -78,18 +87,18 @@ defmodule Lightning.WorkflowVersionsTest do # Insert first version from app assert {:ok, wf1} = WorkflowVersions.record_version(wf, @a, "app") - assert wf1.version_history == ["app:#{@a}"] + assert get_history(wf.id) == ["app:#{@a}"] assert count_rows(wf.id) == 1 # Insert second version from cli assert {:ok, wf2} = WorkflowVersions.record_version(wf1, @b, "cli") - assert wf2.version_history == ["app:#{@a}", "cli:#{@b}"] + assert get_history(wf.id) == ["app:#{@a}", "cli:#{@b}"] assert count_rows(wf.id) == 2 # Insert third version from cli (same source as latest) - should squash in DB assert {:ok, wf3} = WorkflowVersions.record_version(wf2, @c, "cli") - # Version history keeps all entries (doesn't remove old ones) - assert wf3.version_history == ["app:#{@a}", "cli:#{@c}"] + # History shows squashing happened (old cli version replaced) + assert get_history(wf.id) == ["app:#{@a}", "cli:#{@c}"] # Still 2 rows (deleted @b, added @c in database) assert count_rows(wf.id) == 2 @@ -104,16 +113,16 @@ defmodule Lightning.WorkflowVersionsTest do # Build up a history assert {:ok, wf1} = WorkflowVersions.record_version(wf, @a, "app") assert {:ok, wf2} = WorkflowVersions.record_version(wf1, @b, "cli") - assert wf2.version_history == ["app:#{@a}", "cli:#{@b}"] + assert get_history(wf.id) == ["app:#{@a}", "cli:#{@b}"] # Multiple squashes from cli assert {:ok, wf3} = WorkflowVersions.record_version(wf2, @c, "cli") # @b replaced by @c - assert wf3.version_history == ["app:#{@a}", "cli:#{@c}"] + assert get_history(wf.id) == ["app:#{@a}", "cli:#{@c}"] assert {:ok, wf4} = WorkflowVersions.record_version(wf3, @d, "cli") # @c replaced by @d - assert wf4.version_history == ["app:#{@a}", "cli:#{@d}"] + assert get_history(wf.id) == ["app:#{@a}", "cli:#{@d}"] # Only @a and @d should exist in database assert count_rows(wf.id) == 2 @@ -133,7 +142,7 @@ defmodule Lightning.WorkflowVersionsTest do assert {:ok, wf4} = WorkflowVersions.record_version(wf3, @d, "cli") # All versions should be preserved - assert wf4.version_history == [ + assert get_history(wf.id) == [ "app:#{@a}", "cli:#{@b}", "app:#{@c}", @@ -154,19 +163,14 @@ defmodule Lightning.WorkflowVersionsTest do # First version should work normally assert {:ok, wf1} = WorkflowVersions.record_version(wf, @a, "app") - assert wf1.version_history == ["app:#{@a}"] + assert get_history(wf.id) == ["app:#{@a}"] assert count_rows(wf.id) == 1 end end describe "history_for/1" do - test "uses version_history array when present" do - wf = insert(:workflow, version_history: ["app:#{@a}", "cli:#{@b}"]) - assert WorkflowVersions.history_for(wf) == ["app:#{@a}", "cli:#{@b}"] - end - - test "falls back to table ordered by inserted_at, id when array empty" do - wf = insert(:workflow, version_history: []) + test "queries table ordered by inserted_at, id" do + wf = insert(:workflow) # Insert with deterministic inserted_at values t0 = DateTime.utc_now(:microsecond) @@ -187,20 +191,16 @@ defmodule Lightning.WorkflowVersionsTest do "app:#{@c}" ] end - end - describe "latest_hash/1" do - test "uses List.last(version_history) when present" do - wf = - insert(:workflow, - version_history: ["app:#{@a}", "cli:#{@b}", "app:#{@c}"] - ) - - assert WorkflowVersions.latest_hash(wf) == "app:#{@c}" + test "returns empty list when no versions exist" do + wf = insert(:workflow) + assert WorkflowVersions.history_for(wf) == [] end + end - test "queries table when version_history empty; deterministic by inserted_at desc, id desc" do - wf = insert(:workflow, version_history: []) + describe "latest_hash/1" do + test "queries table; deterministic by inserted_at desc, id desc" do + wf = insert(:workflow) t0 = DateTime.utc_now(:microsecond) t1 = DateTime.add(t0, 1, :microsecond) @@ -216,30 +216,10 @@ defmodule Lightning.WorkflowVersionsTest do assert WorkflowVersions.latest_hash(wf) == "app:#{@c}" end - end - - describe "reconcile_history!/1" do - test "rebuilds version_history from workflow_versions and persists it" do - wf = insert(:workflow, version_history: []) - t0 = DateTime.utc_now(:microsecond) - t1 = DateTime.add(t0, 1, :microsecond) - t2 = DateTime.add(t0, 2, :microsecond) - - Repo.insert_all(WorkflowVersion, [ - %{workflow_id: wf.id, hash: @a, source: "app", inserted_at: t0}, - %{workflow_id: wf.id, hash: @b, source: "cli", inserted_at: t1}, - %{workflow_id: wf.id, hash: @c, source: "app", inserted_at: t2} - ]) - - updated = WorkflowVersions.reconcile_history!(wf) - assert updated.version_history == ["app:#{@a}", "cli:#{@b}", "app:#{@c}"] - - assert Repo.reload!(%Workflow{id: wf.id}).version_history == [ - "app:#{@a}", - "cli:#{@b}", - "app:#{@c}" - ] + test "returns nil when no versions exist" do + wf = insert(:workflow) + assert WorkflowVersions.latest_hash(wf) == nil end end @@ -655,8 +635,8 @@ defmodule Lightning.WorkflowVersionsTest do end describe "concurrency safety (no duplicate append under contention)" do - test "many concurrent record_version calls append only once and insert one row" do - wf = insert(:workflow, version_history: []) + test "many concurrent record_version calls insert only one row" do + wf = insert(:workflow) # spawn N tasks that block on a :go message, then all call record_version tasks = @@ -683,11 +663,9 @@ defmodule Lightning.WorkflowVersionsTest do _ -> false end) - # Only one row in versions table; version_history has the hash once + # Only one row in versions table assert count_rows(wf.id) == 1 - - wf_reloaded = Repo.reload!(%Workflow{id: wf.id}) - assert wf_reloaded.version_history == ["app:#{@a}"] + assert get_history(wf.id) == ["app:#{@a}"] end end end diff --git a/test/lightning/workflows/workflow_test.exs b/test/lightning/workflows/workflow_test.exs index efccdec483..af2a953ade 100644 --- a/test/lightning/workflows/workflow_test.exs +++ b/test/lightning/workflows/workflow_test.exs @@ -172,9 +172,4 @@ defmodule Lightning.Workflows.WorkflowTest do assert updated.deleted_at end end - - test "version_history defaults to []" do - wf = insert(:workflow) - assert wf.version_history == [] - end end diff --git a/test/lightning/workflows_test.exs b/test/lightning/workflows_test.exs index efde9e029f..cd8dace486 100644 --- a/test/lightning/workflows_test.exs +++ b/test/lightning/workflows_test.exs @@ -134,18 +134,16 @@ defmodule Lightning.WorkflowsTest do valid_attrs = %{name: "versioned-workflow", project_id: project.id} {:ok, workflow} = Workflows.save_workflow(valid_attrs, user) - # Reload to get updated version_history - workflow = Repo.reload!(workflow) - # Check that a version was recorded - assert length(workflow.version_history) == 1 + history = Lightning.WorkflowVersions.history_for(workflow) + assert length(history) == 1 # Verify the version exists in the database version = Lightning.Workflows.WorkflowVersion |> Repo.get_by!(workflow_id: workflow.id) - assert "#{version.source}:#{version.hash}" == hd(workflow.version_history) + assert "#{version.source}:#{version.hash}" == hd(history) assert version.source == "app" end From 75c3b3568e14e490befb3cb7518c1e301592832c Mon Sep 17 00:00:00 2001 From: Frank Midigo Date: Fri, 6 Feb 2026 12:00:34 +0300 Subject: [PATCH 07/11] release v2.15.13 --- CHANGELOG.md | 2 ++ mix.exs | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c53e20216f..587f8c749a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,8 @@ and this project adheres to ### Fixed +## [2.15.13] - 2026-02-06 + ## [2.15.13-pre1] - 2026-02-05 ### Fixed diff --git a/mix.exs b/mix.exs index 4ac57ffc46..b7569004a8 100644 --- a/mix.exs +++ b/mix.exs @@ -4,7 +4,7 @@ defmodule Lightning.MixProject do def project do [ app: :lightning, - version: "2.15.13-pre1", + version: "2.15.13", elixir: "~> 1.18", elixirc_paths: elixirc_paths(Mix.env()), elixirc_options: [ From e69a1a9da065880cd2c73ed5b3a96dcd60f32261 Mon Sep 17 00:00:00 2001 From: Stuart Corbishley Date: Fri, 6 Feb 2026 17:01:55 +0200 Subject: [PATCH 08/11] Refactor CircleCI to build-then-fan-out pattern (#4378) * Refactor CircleCI to build-then-fan-out pattern Previously, all 6 CI jobs ran the full build independently in parallel, causing cache race conditions and redundant compilation that contributed to flaky tests. Now uses a single compile job that persists to workspace, with 6 lightweight check jobs that attach the pre-built workspace: - compile: Full build, deps, PLT - runs once - check_formatting, check_credo, check_dialyzer, check_sobelow: Fast checks - test_elixir, test_javascript: Test suites Benefits: - Eliminates cache contention between parallel jobs - Single compilation instead of 6x redundant builds - Faster feedback on simple failures - More reliable and debuggable CI runs Also adds reusable executors and commands for cleaner config. * Update changelog for CircleCI refactor Add entry for PR #4378 * Fix workspace persist path error Remove ~/.mix from persist_to_workspace paths - it's outside the workspace root and already handled by the deps cache. * Fix git safe.directory error during checkout Add git safe.directory config before checkout to avoid "dubious ownership" error when running as root in the lightning user's home. * Install Hex/Rebar in downstream jobs ~/.mix isn't part of the workspace, so downstream jobs need to install Hex and Rebar. This is a fast operation that doesn't require compilation. * Add Node.js to test_elixir job Lightning.AdaptorService calls npm during startup, so Node must be available for Elixir tests to run. * Fix ownership after cache restore Cache is restored as root, so _build and deps directories need chown before lightning user can compile. * Add libsodium to test_elixir job The enacl NIF requires libsodium at runtime. * Fix cache path for Hex/Rebar ~/.mix expands to /root/.mix but Hex is installed to /home/lightning/.mix. Use explicit path and bump cache version to v7 for clean slate. * Fix ownership of .mix directory after cache restore Cache restores /home/lightning/.mix as root. Need to chown all of /home/lightning, not just the project directory. * Fix rebar install and consolidate lint checks into single job - Fix rebar3 installing for root instead of lightning user, which was causing all Erlang deps to recompile on every CI run - Combine formatting, credo, and sobelow into single lint job to reduce environment spin-up overhead (~30s per job saved) - Use when: always to run all lint checks even if one fails, with sentinel file to track failures * Use date-based cache keys instead of version numbers * Move changelog entry --- .circleci/config.yml | 276 ++++++++++++++++++++++++++++++------------- CHANGELOG.md | 4 + 2 files changed, 199 insertions(+), 81 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index e3fa7bc1ad..fe81dd272c 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -3,6 +3,31 @@ version: 2.1 orbs: codecov: codecov/codecov@1.0.5 +# Shared configuration +defaults: + elixir_version: &elixir_version "1.18.3-otp-27" + nodejs_version: &nodejs_version "22.12.0" + docker_elixir: &docker_elixir + - image: elixir:1.18.3-otp-27 + docker_elixir_postgres: &docker_elixir_postgres + - image: elixir:1.18.3-otp-27 + - image: cimg/postgres:17.3 + environment: &environment + ERL_FLAGS: +S 4:4 + ASSERT_RECEIVE_TIMEOUT: 1000 + MIX_ENV: test + +executors: + elixir: + docker: *docker_elixir + environment: *environment + working_directory: /home/lightning/project + + elixir_with_postgres: + docker: *docker_elixir_postgres + environment: *environment + working_directory: /home/lightning/project + commands: install_node: description: "Install NodeJS from NodeSource" @@ -17,121 +42,210 @@ commands: curl -fsSL https://deb.nodesource.com/setup_${NODE_MAJOR}.x | bash - apt-get install -y nodejs=<< parameters.version >>-1nodesource1 -jobs: - build: - parameters: - elixir_version: - description: Elixir version - type: string - default: "1.18.3-otp-27" - nodejs_version: - description: NodeJS version - type: string - default: "22.12.0" - execute: - description: What steps to execute after build - type: steps - - parallelism: 1 - docker: - - image: elixir:<< parameters.elixir_version >> - - image: cimg/postgres:17.3 - environment: - ERL_FLAGS: +S 4:4 - ASSERT_RECEIVE_TIMEOUT: 1000 - MIX_ENV: test - working_directory: /home/lightning/project + setup_lightning_user: + description: "Create lightning user and configure sudo" + steps: + - run: + name: "Create lightning user" + command: adduser --home /home/lightning --system lightning + - run: + name: "Install sudo and configure environment passthrough" + command: | + apt-get update && apt-get install -y sudo + echo 'Defaults env_keep += "ERL_FLAGS ASSERT_RECEIVE_TIMEOUT MIX_ENV"' | \ + sudo EDITOR='tee -a' visudo + - run: + name: "Configure git safe directory" + command: git config --global --add safe.directory /home/lightning/project + attach_built_workspace: + description: "Attach workspace and restore ownership" steps: - - run: adduser --home /home/lightning --system lightning + - attach_workspace: + at: /home/lightning/project + - run: + name: "Restore ownership after workspace attach" + command: chown -R lightning /home/lightning + - run: + name: "Install Hex and Rebar" + command: sudo -u lightning mix local.hex --force && sudo -u lightning mix local.rebar --force + +jobs: + # ============================================================================ + # COMPILE JOB - Builds everything once, persists to workspace + # ============================================================================ + compile: + executor: elixir + steps: + - setup_lightning_user - checkout + - run: + name: "Set ownership" + command: chown -R lightning /home/lightning - install_node: - version: << parameters.nodejs_version >> + version: *nodejs_version - run: - name: "Save Elixir and Erlang version for PLT caching" + name: "Save Elixir and Erlang version for caching" command: echo "$ELIXIR_VERSION $OTP_VERSION" | tee .elixir_otp_version - run: name: "Introspect schedulers" command: elixir -v + # Restore dependency cache - restore_cache: keys: - - v5-deps-{{ arch }}-{{ checksum ".elixir_otp_version" }}-{{ checksum "mix.lock" }} - - v5-deps-{{ arch }}-{{ checksum ".elixir_otp_version" }} + - 2026-02-05-deps-{{ arch }}-{{ checksum ".elixir_otp_version" }}-{{ checksum "mix.lock" }} + - 2026-02-05-deps-{{ arch }}-{{ checksum ".elixir_otp_version" }} + + - run: + name: "Fix ownership after cache restore" + command: chown -R lightning /home/lightning + - run: - name: "Install libsodium and sudo" + name: "Install build dependencies" command: | apt-get update && apt-get install -y \ build-essential \ cmake \ - libsodium-dev \ - sudo - - run: | - echo 'Defaults env_keep += "ERL_FLAGS ASSERT_RECEIVE_TIMEOUT MIX_ENV"' | \ - sudo EDITOR='tee -a' visudo - - - run: chown -R lightning /home/lightning - - run: sudo -u lightning mix local.hex --force && mix local.rebar --force - - run: cd assets; sudo -u lightning npm install --force - - run: sudo -u lightning mix do deps.get --only test, deps.compile, compile - - run: sudo -u lightning mix lightning.install_runtime + libsodium-dev + + - run: + name: "Install Hex and Rebar" + command: sudo -u lightning mix local.hex --force && sudo -u lightning mix local.rebar --force + + - run: + name: "Install Node dependencies" + command: cd assets && sudo -u lightning npm install --force + + - run: + name: "Compile Elixir dependencies and application" + command: sudo -u lightning mix do deps.get --only test, deps.compile, compile + + - run: + name: "Install runtime dependencies" + command: sudo -u lightning mix lightning.install_runtime - save_cache: - key: v5-deps-{{ arch }}-{{ checksum ".elixir_otp_version" }}-{{ checksum "mix.lock" }} + key: 2026-02-05-deps-{{ arch }}-{{ checksum ".elixir_otp_version" }}-{{ checksum "mix.lock" }} paths: - _build - deps - - ~/.mix + - /home/lightning/.mix + # Restore and build PLT for Dialyzer - restore_cache: name: "Restore PLT cache" keys: - - v5-plt-{{ arch }}-{{ checksum ".elixir_otp_version" }}-{{ checksum "mix.lock" }} - - v5-plt-{{ arch }}-{{ checksum ".elixir_otp_version" }} + - 2026-02-05-plt-{{ arch }}-{{ checksum ".elixir_otp_version" }}-{{ checksum "mix.lock" }} + - 2026-02-05-plt-{{ arch }}-{{ checksum ".elixir_otp_version" }} + + - run: + name: "Ensure PLT directory exists" + command: mkdir -p priv/plts && chown -R lightning priv/plts + + - run: + name: "Build Dialyzer PLT" + command: sudo -u lightning env MIX_ENV=test mix dialyzer --plt - - run: mkdir -p priv/plts && chown -R lightning priv/plts - - run: sudo -u lightning env MIX_ENV=test mix dialyzer --plt - save_cache: - key: v5-plt-{{ arch }}-{{ checksum ".elixir_otp_version" }}-{{ checksum "mix.lock" }} + key: 2026-02-05-plt-{{ arch }}-{{ checksum ".elixir_otp_version" }}-{{ checksum "mix.lock" }} paths: - priv/plts - - steps: << parameters.execute >> + # Persist everything to workspace for downstream jobs + - persist_to_workspace: + root: /home/lightning/project + paths: + - . + + # ============================================================================ + # CHECK JOBS - Fast, parallel jobs that use the pre-built workspace + # ============================================================================ + lint: + executor: elixir + steps: + - setup_lightning_user + - attach_built_workspace + - run: + name: "Check code formatting" + command: | + sudo -u lightning mix format --check-formatted || touch /tmp/lint_failed + - run: + name: "Check code style with Credo" + when: always + command: | + sudo -u lightning mix credo --strict --all || touch /tmp/lint_failed + - run: + name: "Check for security vulnerabilities" + when: always + command: | + sudo -u lightning mix sobelow --threshold medium || touch /tmp/lint_failed + - run: + name: "Verify all checks passed" + when: always + command: | + if [ -f /tmp/lint_failed ]; then + echo "One or more lint checks failed" + exit 1 + fi + + check_dialyzer: + executor: elixir + steps: + - setup_lightning_user + - attach_built_workspace + - run: + name: "Run Dialyzer type checking" + command: sudo -u lightning mix dialyzer + + test_elixir: + executor: elixir_with_postgres + steps: + - setup_lightning_user + - attach_built_workspace + - install_node: + version: *nodejs_version + - run: + name: "Install libsodium" + command: apt-get update && apt-get install -y libsodium-dev + - run: + name: "Setup test database" + command: sudo -u lightning mix do ecto.create, ecto.migrate + - run: + name: "Run Elixir tests" + command: sudo -u lightning ./bin/ci_tests + - codecov/upload: + file: test/reports/coverage.json + - store_test_results: + path: test/reports/ + + test_javascript: + executor: elixir + steps: + - setup_lightning_user + - attach_built_workspace + - install_node: + version: *nodejs_version + - run: + name: "Run JavaScript tests" + command: cd assets && sudo -u lightning npm run test-report + - store_test_results: + path: test/reports/ workflows: pre-flight checks: jobs: - - build: - name: "Check code formatting" - execute: - - run: sudo -u lightning mix format --check-formatted - - build: - name: "Check code style" - execute: - - run: sudo -u lightning mix credo --strict --all - - build: - name: "Type check" - execute: - - run: sudo -u lightning mix dialyzer - - build: - name: "Check for security vulnerabilities" - execute: - - run: sudo -u lightning mix sobelow --threshold medium - - build: - name: "Check Elixir tests (codecov)" - execute: - - run: sudo -u lightning mix do ecto.create, ecto.migrate - - run: - command: sudo -u lightning ./bin/ci_tests - - codecov/upload: - file: test/reports/coverage.json - - store_test_results: - path: test/reports/ - - build: - name: "Check Javascript tests" - execute: - - run: cd assets; sudo -u lightning npm install && npm run test-report - - store_test_results: - path: test/reports/ + # First: compile everything once + - compile + + # Then: run all checks in parallel using the compiled workspace + - lint: + requires: [compile] + - check_dialyzer: + requires: [compile] + - test_elixir: + requires: [compile] + - test_javascript: + requires: [compile] diff --git a/CHANGELOG.md b/CHANGELOG.md index 587f8c749a..acebc29d10 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,10 @@ and this project adheres to ### Changed +- Refactor CircleCI to build-then-fan-out pattern, compiling once then running + checks in parallel to eliminate cache race conditions and reduce flaky tests + [#4378](https://github.com/OpenFn/lightning/pull/4378) + ### Fixed ## [2.15.13] - 2026-02-06 From c792dae72111cea89440ebf093c7945b9f691abd Mon Sep 17 00:00:00 2001 From: Stuart Corbishley Date: Mon, 9 Feb 2026 06:30:12 +0200 Subject: [PATCH 09/11] Fix StaleEntryError on edge retargeting when jobs are replaced (#4390) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix .gitignore to preserve test fixture adaptor_registry_cache.json The blanket ignore rule for adaptor_registry_cache.json was causing the test fixture copy to be excluded by tools that respect .gitignore (e.g. rsync), leading to test failures. * Fix cascade delete race condition causing StaleEntryError on edge retargeting When a job is deleted and its edge retargeted in the same save, PostgreSQL's ON DELETE CASCADE removes the edge before Ecto can update it, causing a StaleEntryError. Change edge FK constraints from CASCADE to partial SET NULL (PG 15+) so deleted jobs nullify edge references instead of cascading. Add orphan tracking to both save paths (save_workflow and provisioner) that precisely captures which edges were affected, then cleans up only those edges after the save completes - preserving legitimate NULL target edges. * Update provisioner test for partial SET NULL FK behavior The test previously expected StaleEntryError when deleting a job and its edge in the same import. With partial SET NULL FKs, the operation now succeeds correctly - the FK nullifies the edge reference instead of cascade-deleting it, allowing Ecto to process the edge deletion. * More tests * Clean up provisioner_test.exs * Handle source_job edge orphaning when jobs are deleted Previously only target_job_id references were tracked and cleaned up when jobs were deleted. Edges where the deleted job was the source (source_job_id) were nullified but not tracked for cleanup, leaving orphaned edges with no source. This matters when replacing the first job in a chain — both the trigger edge target and the downstream edge source need handling. - Track source-orphaned edge IDs alongside target-orphaned in both save_workflow and provisioner paths - Broaden cleanup to delete edges missing either target or source (without trigger fallback) - Restore two unrelated tests removed from provisioner "new project" block (collections, trigger edge enabled) - Add 6 new tests covering source-job deletion and retargeting scenarios across both save paths * Add changelog entry for edge retargeting StaleEntryError fix --- .gitignore | 1 + CHANGELOG.md | 4 + lib/lightning/projects/provisioner.ex | 47 ++ lib/lightning/workflows.ex | 80 +++ ...06085810_change_edge_job_fks_to_nilify.exs | 57 ++ test/lightning/projects/provisioner_test.exs | 592 ++++++++++-------- test/lightning/workflows_test.exs | 407 ++++++++++++ 7 files changed, 925 insertions(+), 263 deletions(-) create mode 100644 priv/repo/migrations/20260206085810_change_edge_job_fks_to_nilify.exs diff --git a/.gitignore b/.gitignore index b7bcd12c8a..f8ff37ac20 100644 --- a/.gitignore +++ b/.gitignore @@ -27,6 +27,7 @@ lightning-*.tar # Ignore adaptor registry cache adaptor_registry_cache.json +!test/fixtures/adaptor_registry_cache.json # Ignore assets that are produced by build tools. /priv/static/images/adaptors/ diff --git a/CHANGELOG.md b/CHANGELOG.md index acebc29d10..7c77bc0a6a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,10 @@ and this project adheres to ### Fixed +- Fixed StaleEntryError when saving workflows where a job is replaced and its + edge retargeted to a new job (e.g. via AI assistant) + [#4383](https://github.com/OpenFn/lightning/issues/4383) + ## [2.15.13] - 2026-02-06 ## [2.15.13-pre1] - 2026-02-05 diff --git a/lib/lightning/projects/provisioner.ex b/lib/lightning/projects/provisioner.ex index 809c1ab304..49f33d0c51 100644 --- a/lib/lightning/projects/provisioner.ex +++ b/lib/lightning/projects/provisioner.ex @@ -65,8 +65,11 @@ defmodule Lightning.Projects.Provisioner do with :ok <- VersionControlUsageLimiter.limit_github_sync(project.id), project_changeset <- build_import_changeset(project, user_or_repo_connection, data), + edges_to_cleanup <- + edges_referencing_deleted_jobs(project_changeset), {:ok, %{workflows: workflows} = project} <- Repo.insert_or_update(project_changeset, allow_stale: allow_stale), + :ok <- cleanup_orphaned_edges(edges_to_cleanup), :ok <- handle_collection_deletion(project_changeset), updated_project <- preload_dependencies(project), {:ok, _changes} <- @@ -271,6 +274,50 @@ defmodule Lightning.Projects.Provisioner do end end + # Before import, find edges referencing a job being deleted (as target or source). + # Returns edge IDs so we can clean them up after the FK cascade sets NULL. + defp edges_referencing_deleted_jobs(project_changeset) do + deleted_job_ids = + project_changeset + |> get_assoc(:workflows) + |> Enum.flat_map(fn wf_cs -> + wf_cs + |> get_assoc(:jobs) + |> Enum.filter(fn job_cs -> job_cs.action == :delete end) + |> Enum.map(&get_field(&1, :id)) + end) + |> Enum.reject(&is_nil/1) + + if deleted_job_ids == [] do + [] + else + from(e in Edge, + where: + e.target_job_id in ^deleted_job_ids or + e.source_job_id in ^deleted_job_ids, + select: e.id + ) + |> Repo.all() + end + end + + # After import, remove edges that were orphaned by job deletion. + # Only deletes edges whose IDs we captured before the FK cascade, + # and only if they still have a NULL FK (target or source without trigger). + defp cleanup_orphaned_edges([]), do: :ok + + defp cleanup_orphaned_edges(edge_ids) do + from(e in Edge, + where: e.id in ^edge_ids, + where: + is_nil(e.target_job_id) or + (is_nil(e.source_job_id) and is_nil(e.source_trigger_id)) + ) + |> Repo.delete_all() + + :ok + end + defp handle_collection_deletion(project_changeset) do deleted_size = project_changeset diff --git a/lib/lightning/workflows.ex b/lib/lightning/workflows.ex index a3080f3b89..ab8493729c 100644 --- a/lib/lightning/workflows.ex +++ b/lib/lightning/workflows.ex @@ -155,7 +155,18 @@ defmodule Lightning.Workflows do {:error, :workflow_deleted} end end) + |> Multi.run(:orphan_deleted_jobs, fn repo, _changes -> + orphan_jobs_being_deleted(repo, changeset) + end) |> Multi.insert_or_update(:workflow, changeset) + |> Multi.run(:cleanup_orphaned_edges, fn repo, + %{ + workflow: workflow, + orphan_deleted_jobs: + orphaned_edge_ids + } -> + cleanup_orphaned_edges(repo, workflow.id, orphaned_edge_ids) + end) |> then(fn multi -> if changeset.changes == %{} do multi @@ -219,6 +230,75 @@ defmodule Lightning.Workflows do |> save_workflow(actor, opts) end + # Nullifies edge FK references to jobs that are about to be deleted. + # This prevents PostgreSQL's cascade delete from removing edges that Ecto + # is trying to update (the retargeting race condition). + # + # Returns the IDs of edges whose target_job_id or source_job_id was nullified, + # so that cleanup_orphaned_edges can precisely remove only those edges (if they + # weren't retargeted by the changeset). + defp orphan_jobs_being_deleted(repo, changeset) do + deleted_job_ids = + changeset + |> Ecto.Changeset.get_change(:jobs, []) + |> Enum.filter(fn cs -> cs.action in [:replace, :delete] end) + |> Enum.map(fn cs -> cs.data.id end) + + if deleted_job_ids == [] do + {:ok, []} + else + workflow_id = changeset.data.id + + {_target_count, target_orphaned_ids} = + from(e in Edge, + where: e.workflow_id == ^workflow_id, + where: e.target_job_id in ^deleted_job_ids, + select: e.id + ) + |> repo.update_all(set: [target_job_id: nil]) + + {_source_count, source_orphaned_ids} = + from(e in Edge, + where: e.workflow_id == ^workflow_id, + where: e.source_job_id in ^deleted_job_ids, + select: e.id + ) + |> repo.update_all(set: [source_job_id: nil]) + + orphaned_edge_ids = + Enum.uniq(target_orphaned_ids ++ source_orphaned_ids) + + Logger.debug(fn -> + "Orphaned #{length(target_orphaned_ids)} target and #{length(source_orphaned_ids)} source edge refs for deleted jobs: #{inspect(deleted_job_ids)}" + end) + + {:ok, orphaned_edge_ids} + end + end + + # Removes edges that were orphaned by job deletion and not retargeted. + # Only deletes edges whose IDs were returned by orphan_jobs_being_deleted + # AND that still have a NULL FK (target_job_id or source without trigger). + defp cleanup_orphaned_edges(_repo, _workflow_id, []), do: {:ok, 0} + + defp cleanup_orphaned_edges(repo, workflow_id, orphaned_edge_ids) do + {count, _} = + from(e in Edge, + where: e.workflow_id == ^workflow_id, + where: e.id in ^orphaned_edge_ids, + where: + is_nil(e.target_job_id) or + (is_nil(e.source_job_id) and is_nil(e.source_trigger_id)) + ) + |> repo.delete_all() + + Logger.debug(fn -> + "Cleaned up #{count} orphaned edges for workflow #{workflow_id}" + end) + + {:ok, count} + end + @spec publish_kafka_trigger_events(Ecto.Changeset.t(Workflow.t())) :: :ok def publish_kafka_trigger_events(changeset) do changeset diff --git a/priv/repo/migrations/20260206085810_change_edge_job_fks_to_nilify.exs b/priv/repo/migrations/20260206085810_change_edge_job_fks_to_nilify.exs new file mode 100644 index 0000000000..f7dd1e4cb0 --- /dev/null +++ b/priv/repo/migrations/20260206085810_change_edge_job_fks_to_nilify.exs @@ -0,0 +1,57 @@ +defmodule Lightning.Repo.Migrations.ChangeEdgeJobFksToNilify do + use Ecto.Migration + + def up do + # Drop existing FK constraints that use on_delete: :delete_all + # These cause cascade deletes that race with Ecto's edge updates + drop constraint(:workflow_edges, "workflow_edges_target_job_id_fkey") + drop constraint(:workflow_edges, "workflow_edges_source_job_id_fkey") + + # Recreate FK constraints with ON DELETE SET NULL on only the job ID column. + # We can't use Ecto's :nilify_all because compound FKs (with workflow_id) + # would nilify workflow_id too, which violates NOT NULL. + # PostgreSQL 15+ supports partial SET NULL: ON DELETE SET NULL (column). + execute """ + ALTER TABLE workflow_edges + ADD CONSTRAINT workflow_edges_target_job_id_fkey + FOREIGN KEY (target_job_id, workflow_id) + REFERENCES jobs(id, workflow_id) + ON DELETE SET NULL (target_job_id) + """ + + execute """ + ALTER TABLE workflow_edges + ADD CONSTRAINT workflow_edges_source_job_id_fkey + FOREIGN KEY (source_job_id, workflow_id) + REFERENCES jobs(id, workflow_id) + ON DELETE SET NULL (source_job_id) + """ + end + + def down do + drop constraint(:workflow_edges, "workflow_edges_target_job_id_fkey") + drop constraint(:workflow_edges, "workflow_edges_source_job_id_fkey") + + # Clean up any orphaned edges before restoring cascade constraints + execute """ + DELETE FROM workflow_edges + WHERE target_job_id IS NULL OR source_job_id IS NULL + """ + + alter table(:workflow_edges) do + modify :target_job_id, + references(:jobs, + on_delete: :delete_all, + type: :binary_id, + with: [workflow_id: :workflow_id] + ) + + modify :source_job_id, + references(:jobs, + on_delete: :delete_all, + type: :binary_id, + with: [workflow_id: :workflow_id] + ) + end + end +end diff --git a/test/lightning/projects/provisioner_test.exs b/test/lightning/projects/provisioner_test.exs index a4905e6c58..55a85f13f4 100644 --- a/test/lightning/projects/provisioner_test.exs +++ b/test/lightning/projects/provisioner_test.exs @@ -129,7 +129,7 @@ defmodule Lightning.Projects.ProvisionerTest do describe "import_document/2 with a new project" do test "with valid data" do Mox.verify_on_exit!() - user = insert(:user) + %{id: user_id} = user = insert(:user) credential = insert(:credential, name: "Test Credential", user: user) @@ -208,6 +208,21 @@ defmodule Lightning.Projects.ProvisionerTest do |> Enum.any?(fn pu -> pu.user_id == user.id && pu.role == :owner end) + + # Verify audit trail for snapshot creation + %{id: snapshot_id} = Snapshot.get_current_for(workflow) + + audit = Repo.one!(from a in Audit, where: a.event == "snapshot_created") + + assert %{ + item_id: ^workflow_id, + actor_id: ^user_id, + changes: %{ + after: %{ + "snapshot_id" => ^snapshot_id + } + } + } = audit end test "importing with nil project delegates to empty Project struct" do @@ -232,75 +247,6 @@ defmodule Lightning.Projects.ProvisionerTest do assert length(project.workflows) == 1 end - test "audit the creation of a snapshot" do - Mox.verify_on_exit!() - %{id: user_id} = user = insert(:user) - - credential = insert(:credential, name: "Test Credential", user: user) - - %{ - body: %{"workflows" => [workflow]} = body, - workflows: [ - %{ - id: workflow_id, - first_job_id: first_job_id - } - ] - } = valid_document() - - project_credential_id = Ecto.UUID.generate() - - credentials_payload = - [ - %{ - "id" => project_credential_id, - "name" => credential.name, - "owner" => user.email - } - ] - - updated_workflow_jobs = - Enum.map(workflow["jobs"], fn job -> - if job["id"] == first_job_id do - job - |> Map.put("project_credential_id", project_credential_id) - else - job - end - end) - - body_with_credentials = - body - |> Map.put("project_credentials", credentials_payload) - |> Map.put("workflows", [%{workflow | "jobs" => updated_workflow_jobs}]) - - Mox.stub( - Lightning.Extensions.MockUsageLimiter, - :limit_action, - fn _action, _context -> :ok end - ) - - Provisioner.import_document( - %Lightning.Projects.Project{}, - user, - body_with_credentials - ) - - %{id: snapshot_id} = Repo.one!(Snapshot) - - audit = Repo.one!(from a in Audit, where: a.event == "snapshot_created") - - assert %{ - item_id: ^workflow_id, - actor_id: ^user_id, - changes: %{ - after: %{ - "snapshot_id" => ^snapshot_id - } - } - } = audit - end - test "audits the creation of workflows" do %{id: user_id} = user = insert(:user) @@ -424,21 +370,22 @@ defmodule Lightning.Projects.ProvisionerTest do describe "import_document/2 with an existing project" do setup do Mox.verify_on_exit!() - %{project: ProjectsFixtures.project_fixture(), user: insert(:user)} - end + project = ProjectsFixtures.project_fixture() + user = insert(:user) - test "doesn't add another project user", %{ - project: %{id: project_id} = project, - user: user - } do Mox.stub( Lightning.Extensions.MockUsageLimiter, :limit_action, - fn _action, %{project_id: ^project_id} -> - :ok - end + fn _action, _context -> :ok end ) + %{project: project, user: user} + end + + test "doesn't add another project user", %{ + project: project, + user: user + } do %{body: body} = valid_document(project.id) {:ok, project} = Provisioner.import_document(project, user, body) @@ -462,17 +409,9 @@ defmodule Lightning.Projects.ProvisionerTest do end test "audits the creation of a snapshot", %{ - project: %{id: project_id} = project, + project: project, user: %{id: user_id} = user } do - Mox.stub( - Lightning.Extensions.MockUsageLimiter, - :limit_action, - fn _action, %{project_id: ^project_id} -> - :ok - end - ) - %{body: body, workflows: [%{id: workflow_id}]} = valid_document(project.id) {:ok, _project} = Provisioner.import_document(project, user, body) @@ -493,17 +432,9 @@ defmodule Lightning.Projects.ProvisionerTest do end test "changing, adding records", %{ - project: %{id: project_id} = project, + project: project, user: user } do - Mox.stub( - Lightning.Extensions.MockUsageLimiter, - :limit_action, - fn _action, %{project_id: ^project_id} -> - :ok - end - ) - %{body: body, workflows: [%{id: workflow_id}]} = valid_document(project.id) {:ok, project} = Provisioner.import_document(project, user, body) @@ -533,7 +464,7 @@ defmodule Lightning.Projects.ProvisionerTest do body = body |> Map.put("name", "test-project-renamed") - |> add_job_to_document(workflow_id, %{ + |> add_entity_to_workflow(workflow_id, "jobs", %{ "id" => third_job_id, "name" => "third-job", "adaptor" => "@openfn/language-common@latest", @@ -580,17 +511,9 @@ defmodule Lightning.Projects.ProvisionerTest do end test "adding a record from another project or workflow", %{ - project: %{id: project_id} = project, + project: project, user: user } do - Mox.stub( - Lightning.Extensions.MockUsageLimiter, - :limit_action, - fn _action, %{project_id: ^project_id} -> - :ok - end - ) - %{body: body, workflows: [%{id: workflow_id}]} = valid_document(project.id) {:ok, project} = Provisioner.import_document(project, user, body) @@ -672,17 +595,9 @@ defmodule Lightning.Projects.ProvisionerTest do end test "fails when an edge has no source", %{ - project: %{id: project_id} = project, + project: project, user: user } do - Mox.stub( - Lightning.Extensions.MockUsageLimiter, - :limit_action, - fn _action, %{project_id: ^project_id} -> - :ok - end - ) - %{body: body, workflows: [%{id: workflow_id}]} = valid_document(project.id) %{id: other_edge_id} = Lightning.Factories.insert(:edge) @@ -715,15 +630,7 @@ defmodule Lightning.Projects.ProvisionerTest do } end - test "removing a record", %{project: %{id: project_id} = project, user: user} do - Mox.stub( - Lightning.Extensions.MockUsageLimiter, - :limit_action, - fn _action, %{project_id: ^project_id} -> - :ok - end - ) - + test "removing a record", %{project: project, user: user} do %{ body: body, workflows: [%{second_job_id: second_job_id}] @@ -770,17 +677,9 @@ defmodule Lightning.Projects.ProvisionerTest do end test "removing a workflow", %{ - project: %{id: project_id} = project, + project: project, user: user } do - Mox.stub( - Lightning.Extensions.MockUsageLimiter, - :limit_action, - fn _action, %{project_id: ^project_id} -> - :ok - end - ) - %{ body: body, workflows: [%{id: workflow_id}] @@ -807,17 +706,9 @@ defmodule Lightning.Projects.ProvisionerTest do end test "marking a new/changed record for deletion", %{ - project: %{id: project_id} = project, + project: project, user: user } do - Mox.stub( - Lightning.Extensions.MockUsageLimiter, - :limit_action, - fn _action, %{project_id: ^project_id} -> - :ok - end - ) - body = %{ "id" => project.id, "name" => "test-project", @@ -863,19 +754,11 @@ defmodule Lightning.Projects.ProvisionerTest do end test "sends workflow updated event", %{ - project: %{id: project_id} = project, + project: project, user: user } do %{body: body, workflows: [%{id: workflow_id}]} = valid_document(project.id) - Mox.stub( - Lightning.Extensions.MockUsageLimiter, - :limit_action, - fn _action, %{project_id: ^project_id} -> - :ok - end - ) - Lightning.Workflows.subscribe(project.id) assert {:ok, _project} = @@ -906,7 +789,7 @@ defmodule Lightning.Projects.ProvisionerTest do body_with_modifications = body - |> add_job_to_document(first_workflow_id, %{ + |> add_entity_to_workflow(first_workflow_id, "jobs", %{ "id" => Ecto.UUID.generate(), "name" => "third-job", "adaptor" => "@openfn/language-common@latest", @@ -977,14 +860,6 @@ defmodule Lightning.Projects.ProvisionerTest do project: %{id: project_id} = project, user: user } do - Mox.stub( - Lightning.Extensions.MockUsageLimiter, - :limit_action, - fn _action, %{project_id: ^project_id} -> - :ok - end - ) - %{body: body} = valid_document(project.id) {:ok, project} = Provisioner.import_document(project, user, body) @@ -1012,14 +887,6 @@ defmodule Lightning.Projects.ProvisionerTest do project: %{id: project_id} = project, user: user } do - Mox.stub( - Lightning.Extensions.MockUsageLimiter, - :limit_action, - fn _action, %{project_id: ^project_id} -> - :ok - end - ) - collection = insert(:collection, project: project) collection_id = collection.id new_collection_name = "new-collection-name" @@ -1048,14 +915,6 @@ defmodule Lightning.Projects.ProvisionerTest do project: %{id: project_id} = project, user: user } do - Mox.stub( - Lightning.Extensions.MockUsageLimiter, - :limit_action, - fn _action, %{project_id: ^project_id} -> - :ok - end - ) - collection = insert(:collection, project: project) collection_to_delete = insert(:collection, project: project) @@ -1154,14 +1013,6 @@ defmodule Lightning.Projects.ProvisionerTest do project: %{id: project_id} = project, user: user } do - Mox.stub( - Lightning.Extensions.MockUsageLimiter, - :limit_action, - fn _action, %{project_id: ^project_id} -> - :ok - end - ) - collection = insert(:collection, byte_size_sum: 1000, project: project) collection_to_delete_1 = @@ -1202,12 +1053,6 @@ defmodule Lightning.Projects.ProvisionerTest do project: %{id: project_id} = project, user: user } do - Mox.stub( - Lightning.Extensions.MockUsageLimiter, - :limit_action, - fn _action, _context -> :ok end - ) - %{body: %{"workflows" => [workflow]} = body} = valid_document(project.id) {:ok, project} = Provisioner.import_document(project, user, body) @@ -1251,30 +1096,14 @@ defmodule Lightning.Projects.ProvisionerTest do # 3. Sandbox is merged back to parent # The merge should succeed and delete the parent's new edges - # Create parent project with initial workflow - project = insert(:project) - workflow = insert(:workflow, project: project, lock_version: 5) - job1 = insert(:job, workflow: workflow, name: "job1") - trigger = insert(:trigger, workflow: workflow, type: :webhook) - - trigger_edge = - insert(:edge, - workflow: workflow, - source_trigger: trigger, - target_job: job1 - ) - - # Simulate parent modification after sandbox was created: - # add a new job and edge (this would increment lock_version to 6) - job2 = insert(:job, workflow: workflow, name: "job2") - - new_edge = - insert(:edge, workflow: workflow, source_job: job1, target_job: job2) - - # Manually increment lock_version to simulate the modification - workflow = Repo.update!(Ecto.Changeset.change(workflow, lock_version: 6)) - - assert workflow.lock_version == 6 + %{ + project: project, + workflow: workflow, + job1: job1, + trigger: trigger, + trigger_edge: trigger_edge, + job_edge: new_edge + } = merged_workflow_setup() # Now simulate a merge from sandbox that doesn't have job2 # This merged document will try to delete the new edge @@ -1336,16 +1165,79 @@ defmodule Lightning.Projects.ProvisionerTest do assert hd(updated_workflow.jobs).id == job1.id end - test "importing merged workflow without allow_stale raises StaleEntryError", + test "importing merged workflow without allow_stale succeeds when deleting job and edge", %{ user: user } do - # Same setup as above, but WITHOUT allow_stale: true - # This verifies that stale errors are properly raised by default + # This test verifies that deleting a job and its edge in the same + # import works correctly. With partial SET NULL FKs, the job deletion + # nullifies the edge's target_job_id instead of cascade-deleting it, + # so Ecto can then process the edge deletion without a StaleEntryError. + %{ + project: project, + workflow: workflow, + job1: job1, + trigger: trigger, + trigger_edge: trigger_edge + } = merged_workflow_setup() + + # Create merged document that deletes job2 and its edge + merged_document = %{ + "id" => project.id, + "name" => project.name, + "workflows" => [ + %{ + "id" => workflow.id, + "name" => workflow.name, + "lock_version" => workflow.lock_version, + "jobs" => [ + %{ + "id" => job1.id, + "name" => "job1", + "adaptor" => "@openfn/language-common@latest", + "body" => "console.log('from sandbox');" + } + ], + "triggers" => [ + %{ + "id" => trigger.id, + "type" => "webhook" + } + ], + "edges" => [ + %{ + "id" => trigger_edge.id, + "source_trigger_id" => trigger.id, + "target_job_id" => job1.id, + "condition_type" => "always" + } + ] + } + ] + } + + # With partial SET NULL FKs, this succeeds without StaleEntryError + assert {:ok, updated_project} = + Provisioner.import_document(project, user, merged_document) + + # Verify job2 was deleted and only trigger edge remains + reloaded_workflow = + updated_project.workflows + |> Enum.find(&(&1.id == workflow.id)) + + assert length(reloaded_workflow.edges) == 1 + assert length(reloaded_workflow.jobs) == 1 + end + + test "importing document that retargets an edge to a replacement job", + %{user: user} do + # When a job is deleted and a new job takes its place, the edge + # should survive with its original ID pointing to the new target. project = insert(:project) - workflow = insert(:workflow, project: project, lock_version: 5) + workflow = insert(:workflow, project: project) job1 = insert(:job, workflow: workflow, name: "job1") + job2 = insert(:job, workflow: workflow, name: "job2") trigger = insert(:trigger, workflow: workflow, type: :webhook) trigger_edge = @@ -1355,63 +1247,226 @@ defmodule Lightning.Projects.ProvisionerTest do target_job: job1 ) - # Add new edge to parent - job2 = insert(:job, workflow: workflow, name: "job2") - - new_edge = - insert(:edge, workflow: workflow, source_job: job1, target_job: job2) + job_edge = + insert(:edge, + workflow: workflow, + source_job: job1, + target_job: job2 + ) - # Increment lock_version - workflow = Repo.update!(Ecto.Changeset.change(workflow, lock_version: 6)) + job3_id = Ecto.UUID.generate() - # Create merged document that tries to delete the new edge - merged_document = %{ + document = %{ "id" => project.id, "name" => project.name, "workflows" => [ %{ "id" => workflow.id, "name" => workflow.name, - "lock_version" => workflow.lock_version, "jobs" => [ %{ "id" => job1.id, "name" => "job1", "adaptor" => "@openfn/language-common@latest", - "body" => "console.log('from sandbox');" + "body" => "fn(state)" + }, + %{ + "id" => job3_id, + "name" => "job3", + "adaptor" => "@openfn/language-common@latest", + "body" => "fn(state)" } ], "triggers" => [ + %{"id" => trigger.id, "type" => "webhook"} + ], + "edges" => [ %{ - "id" => trigger.id, - "type" => "webhook" + "id" => trigger_edge.id, + "source_trigger_id" => trigger.id, + "target_job_id" => job1.id, + "condition_type" => "always" + }, + %{ + "id" => job_edge.id, + "source_job_id" => job1.id, + "target_job_id" => job3_id, + "condition_type" => "on_job_success" + } + ] + } + ] + } + + assert {:ok, updated_project} = + Provisioner.import_document(project, user, document) + + reloaded_workflow = + updated_project.workflows + |> Enum.find(&(&1.id == workflow.id)) + + assert length(reloaded_workflow.jobs) == 2 + refute Enum.any?(reloaded_workflow.jobs, &(&1.id == job2.id)) + assert Enum.any?(reloaded_workflow.jobs, &(&1.id == job3_id)) + + # Edge preserved its ID but retargeted to new job + retargeted = + Enum.find(reloaded_workflow.edges, &(&1.id == job_edge.id)) + + assert retargeted.target_job_id == job3_id + assert retargeted.source_job_id == job1.id + end + + test "deleting source job cleans up orphaned edge via provisioner", + %{user: user} do + project = insert(:project) + workflow = insert(:workflow, project: project) + job_a = insert(:job, workflow: workflow, name: "job-a") + job_b = insert(:job, workflow: workflow, name: "job-b") + trigger = insert(:trigger, workflow: workflow, type: :webhook) + + trigger_edge = + insert(:edge, + workflow: workflow, + source_trigger: trigger, + target_job: job_a + ) + + job_edge = + insert(:edge, + workflow: workflow, + source_job: job_a, + target_job: job_b + ) + + # Delete job_a, remove both edges, keep only job_b + document = %{ + "id" => project.id, + "name" => project.name, + "workflows" => [ + %{ + "id" => workflow.id, + "name" => workflow.name, + "jobs" => [ + %{ + "id" => job_b.id, + "name" => "job-b", + "adaptor" => "@openfn/language-common@latest", + "body" => "fn(state)" + } + ], + "triggers" => [ + %{"id" => trigger.id, "type" => "webhook"} + ], + "edges" => [] + } + ] + } + + assert {:ok, updated_project} = + Provisioner.import_document(project, user, document) + + reloaded_workflow = + updated_project.workflows + |> Enum.find(&(&1.id == workflow.id)) + + assert length(reloaded_workflow.jobs) == 1 + assert hd(reloaded_workflow.jobs).id == job_b.id + assert reloaded_workflow.edges == [] + + refute Repo.get(Lightning.Workflows.Edge, trigger_edge.id) + refute Repo.get(Lightning.Workflows.Edge, job_edge.id) + end + + test "replacing first job retargets edges via provisioner", + %{user: user} do + project = insert(:project) + workflow = insert(:workflow, project: project) + job_a = insert(:job, workflow: workflow, name: "job-a") + job_b = insert(:job, workflow: workflow, name: "job-b") + trigger = insert(:trigger, workflow: workflow, type: :webhook) + + trigger_edge = + insert(:edge, + workflow: workflow, + source_trigger: trigger, + target_job: job_a + ) + + job_edge = + insert(:edge, + workflow: workflow, + source_job: job_a, + target_job: job_b + ) + + job_c_id = Ecto.UUID.generate() + + # Delete job_a, add job_c, retarget both edges + document = %{ + "id" => project.id, + "name" => project.name, + "workflows" => [ + %{ + "id" => workflow.id, + "name" => workflow.name, + "jobs" => [ + %{ + "id" => job_c_id, + "name" => "job-c", + "adaptor" => "@openfn/language-common@latest", + "body" => "fn(state)" + }, + %{ + "id" => job_b.id, + "name" => "job-b", + "adaptor" => "@openfn/language-common@latest", + "body" => "fn(state)" } ], + "triggers" => [ + %{"id" => trigger.id, "type" => "webhook"} + ], "edges" => [ %{ "id" => trigger_edge.id, "source_trigger_id" => trigger.id, - "target_job_id" => job1.id, + "target_job_id" => job_c_id, "condition_type" => "always" }, %{ - "id" => new_edge.id, - "delete" => true + "id" => job_edge.id, + "source_job_id" => job_c_id, + "target_job_id" => job_b.id, + "condition_type" => "on_job_success" } ] } ] } - # Without allow_stale, this should raise StaleEntryError - assert_raise Ecto.StaleEntryError, fn -> - Provisioner.import_document(project, user, merged_document) - end + assert {:ok, updated_project} = + Provisioner.import_document(project, user, document) + + reloaded_workflow = + updated_project.workflows + |> Enum.find(&(&1.id == workflow.id)) - # Verify nothing was changed (transaction rolled back) - reloaded_workflow = Repo.reload(workflow) |> Repo.preload([:edges, :jobs]) - assert length(reloaded_workflow.edges) == 2 assert length(reloaded_workflow.jobs) == 2 + assert length(reloaded_workflow.edges) == 2 + + # Trigger edge preserved ID, retargeted to job_c + saved_trigger_edge = + Enum.find(reloaded_workflow.edges, &(&1.id == trigger_edge.id)) + + assert saved_trigger_edge.target_job_id == job_c_id + + # Job edge preserved ID, source retargeted to job_c + saved_job_edge = + Enum.find(reloaded_workflow.edges, &(&1.id == job_edge.id)) + + assert saved_job_edge.source_job_id == job_c_id + assert saved_job_edge.target_job_id == job_b.id end end @@ -1463,6 +1518,37 @@ defmodule Lightning.Projects.ProvisionerTest do end end + defp merged_workflow_setup do + project = insert(:project) + workflow = insert(:workflow, project: project, lock_version: 5) + job1 = insert(:job, workflow: workflow, name: "job1") + trigger = insert(:trigger, workflow: workflow, type: :webhook) + + trigger_edge = + insert(:edge, + workflow: workflow, + source_trigger: trigger, + target_job: job1 + ) + + job2 = insert(:job, workflow: workflow, name: "job2") + + job_edge = + insert(:edge, workflow: workflow, source_job: job1, target_job: job2) + + workflow = Repo.update!(Ecto.Changeset.change(workflow, lock_version: 6)) + + %{ + project: project, + workflow: workflow, + job1: job1, + job2: job2, + trigger: trigger, + trigger_edge: trigger_edge, + job_edge: job_edge + } + end + defp valid_document(project_id \\ nil, number_of_workflows \\ 1) do project_id = project_id || Ecto.UUID.generate() @@ -1547,27 +1633,7 @@ defmodule Lightning.Projects.ProvisionerTest do end defp flatten_errors(changeset) do - Ecto.Changeset.traverse_errors( - changeset, - &translate_error/1 - ) - end - - defp add_job_to_document(document, workflow_id, job_params) do - document - |> Map.update!("workflows", fn workflows -> - workflows - |> Enum.map(fn workflow -> - if workflow["id"] == workflow_id do - workflow - |> Map.update!("jobs", fn jobs -> - [job_params | jobs] - end) - else - workflow - end - end) - end) + Ecto.Changeset.traverse_errors(changeset, &translate_error/1) end defp add_entity_to_workflow(document, workflow_id, key, params) do diff --git a/test/lightning/workflows_test.exs b/test/lightning/workflows_test.exs index cd8dace486..04ef9359ac 100644 --- a/test/lightning/workflows_test.exs +++ b/test/lightning/workflows_test.exs @@ -433,6 +433,413 @@ defmodule Lightning.WorkflowsTest do refute Repo.get(Workflows.Edge, edge.id) end + test "edge retargeting: delete job and retarget edge to replacement job" do + project = insert(:project) + user = insert(:user) + + trigger_id = Ecto.UUID.generate() + job_a_id = Ecto.UUID.generate() + job_b_id = Ecto.UUID.generate() + + {:ok, workflow} = + Workflows.save_workflow( + %{ + name: "retarget-test", + project_id: project.id, + jobs: [ + %{id: job_a_id, name: "job-a", body: "fn(state)"}, + %{id: job_b_id, name: "job-b", body: "fn(state)"} + ], + triggers: [%{id: trigger_id, type: :webhook}], + edges: [ + %{ + source_trigger_id: trigger_id, + target_job_id: job_a_id, + condition_type: :always + }, + %{ + source_job_id: job_a_id, + target_job_id: job_b_id, + condition_type: :on_job_success + } + ] + }, + user + ) + + edge_a_to_b = + Enum.find(workflow.edges, &(&1.target_job_id == job_b_id)) + + # Payload: delete job_b, add job_c, retarget the same edge to job_c + job_c_id = Ecto.UUID.generate() + + assert {:ok, saved} = + Workflows.change_workflow(workflow, %{ + jobs: [ + %{id: job_a_id, name: "job-a", body: "fn(state)"}, + %{id: job_c_id, name: "job-c", body: "fn(state)"} + ], + triggers: [%{id: trigger_id, type: :webhook}], + edges: [ + %{ + id: + Enum.find( + workflow.edges, + &(&1.source_trigger_id == trigger_id) + ).id, + source_trigger_id: trigger_id, + target_job_id: job_a_id, + condition_type: :always + }, + %{ + id: edge_a_to_b.id, + source_job_id: job_a_id, + target_job_id: job_c_id, + condition_type: :on_job_success + } + ] + }) + |> Workflows.save_workflow(user) + + saved = Repo.preload(saved, [:jobs, :edges], force: true) + + assert length(saved.jobs) == 2 + refute Enum.any?(saved.jobs, &(&1.id == job_b_id)) + assert Enum.any?(saved.jobs, &(&1.id == job_c_id)) + + # Edge preserved its ID but points to the new job + retargeted = Enum.find(saved.edges, &(&1.id == edge_a_to_b.id)) + assert retargeted.target_job_id == job_c_id + assert retargeted.source_job_id == job_a_id + end + + test "edge retargeting: replace first job retargets trigger edge target and downstream edge source" do + project = insert(:project) + user = insert(:user) + + trigger_id = Ecto.UUID.generate() + job_a_id = Ecto.UUID.generate() + job_b_id = Ecto.UUID.generate() + + {:ok, workflow} = + Workflows.save_workflow( + %{ + name: "replace-first-job", + project_id: project.id, + jobs: [ + %{id: job_a_id, name: "job-a", body: "fn(state)"}, + %{id: job_b_id, name: "job-b", body: "fn(state)"} + ], + triggers: [%{id: trigger_id, type: :webhook}], + edges: [ + %{ + source_trigger_id: trigger_id, + target_job_id: job_a_id, + condition_type: :always + }, + %{ + source_job_id: job_a_id, + target_job_id: job_b_id, + condition_type: :on_job_success + } + ] + }, + user + ) + + trigger_edge = + Enum.find(workflow.edges, &(&1.source_trigger_id == trigger_id)) + + job_edge = + Enum.find(workflow.edges, &(&1.source_job_id == job_a_id)) + + # Delete job_a, add job_c, retarget both edges + job_c_id = Ecto.UUID.generate() + + assert {:ok, saved} = + Workflows.change_workflow(workflow, %{ + jobs: [ + %{id: job_c_id, name: "job-c", body: "fn(state)"}, + %{id: job_b_id, name: "job-b", body: "fn(state)"} + ], + triggers: [%{id: trigger_id, type: :webhook}], + edges: [ + %{ + id: trigger_edge.id, + source_trigger_id: trigger_id, + target_job_id: job_c_id, + condition_type: :always + }, + %{ + id: job_edge.id, + source_job_id: job_c_id, + target_job_id: job_b_id, + condition_type: :on_job_success + } + ] + }) + |> Workflows.save_workflow(user) + + saved = Repo.preload(saved, [:jobs, :edges], force: true) + + assert length(saved.jobs) == 2 + assert length(saved.edges) == 2 + + # Trigger edge preserved ID and retargeted to job_c + saved_trigger_edge = + Enum.find(saved.edges, &(&1.id == trigger_edge.id)) + + assert saved_trigger_edge.target_job_id == job_c_id + + # Job edge preserved ID and source retargeted to job_c + saved_job_edge = Enum.find(saved.edges, &(&1.id == job_edge.id)) + assert saved_job_edge.source_job_id == job_c_id + assert saved_job_edge.target_job_id == job_b_id + end + + test "edge retargeting: delete first job cleans up trigger edge and downstream edge" do + project = insert(:project) + user = insert(:user) + + trigger_id = Ecto.UUID.generate() + job_a_id = Ecto.UUID.generate() + job_b_id = Ecto.UUID.generate() + + {:ok, workflow} = + Workflows.save_workflow( + %{ + name: "delete-first-job", + project_id: project.id, + jobs: [ + %{id: job_a_id, name: "job-a", body: "fn(state)"}, + %{id: job_b_id, name: "job-b", body: "fn(state)"} + ], + triggers: [%{id: trigger_id, type: :webhook}], + edges: [ + %{ + source_trigger_id: trigger_id, + target_job_id: job_a_id, + condition_type: :always + }, + %{ + source_job_id: job_a_id, + target_job_id: job_b_id, + condition_type: :on_job_success + } + ] + }, + user + ) + + trigger_edge = + Enum.find(workflow.edges, &(&1.source_trigger_id == trigger_id)) + + job_edge = + Enum.find(workflow.edges, &(&1.source_job_id == job_a_id)) + + # Delete job_a, remove both edges, keep only job_b + assert {:ok, saved} = + Workflows.change_workflow(workflow, %{ + jobs: [ + %{id: job_b_id, name: "job-b", body: "fn(state)"} + ], + triggers: [%{id: trigger_id, type: :webhook}], + edges: [] + }) + |> Workflows.save_workflow(user) + + saved = Repo.preload(saved, [:jobs, :edges], force: true) + + assert length(saved.jobs) == 1 + assert hd(saved.jobs).id == job_b_id + assert saved.edges == [] + + # Both edges cleaned up + refute Repo.get(Workflows.Edge, trigger_edge.id) + refute Repo.get(Workflows.Edge, job_edge.id) + end + + test "edge retargeting: delete middle job in chain cleans up both edges" do + project = insert(:project) + user = insert(:user) + + trigger_id = Ecto.UUID.generate() + job_a_id = Ecto.UUID.generate() + job_b_id = Ecto.UUID.generate() + job_c_id = Ecto.UUID.generate() + + {:ok, workflow} = + Workflows.save_workflow( + %{ + name: "delete-middle-job", + project_id: project.id, + jobs: [ + %{id: job_a_id, name: "job-a", body: "fn(state)"}, + %{id: job_b_id, name: "job-b", body: "fn(state)"}, + %{id: job_c_id, name: "job-c", body: "fn(state)"} + ], + triggers: [%{id: trigger_id, type: :webhook}], + edges: [ + %{ + source_trigger_id: trigger_id, + target_job_id: job_a_id, + condition_type: :always + }, + %{ + source_job_id: job_a_id, + target_job_id: job_b_id, + condition_type: :on_job_success + }, + %{ + source_job_id: job_b_id, + target_job_id: job_c_id, + condition_type: :on_job_success + } + ] + }, + user + ) + + trigger_edge = + Enum.find(workflow.edges, &(&1.source_trigger_id == trigger_id)) + + a_to_b_edge = + Enum.find(workflow.edges, &(&1.target_job_id == job_b_id)) + + b_to_c_edge = + Enum.find(workflow.edges, &(&1.source_job_id == job_b_id)) + + # Delete job_b, keep trigger -> job_a edge only + assert {:ok, saved} = + Workflows.change_workflow(workflow, %{ + jobs: [ + %{id: job_a_id, name: "job-a", body: "fn(state)"}, + %{id: job_c_id, name: "job-c", body: "fn(state)"} + ], + triggers: [%{id: trigger_id, type: :webhook}], + edges: [ + %{ + id: trigger_edge.id, + source_trigger_id: trigger_id, + target_job_id: job_a_id, + condition_type: :always + } + ] + }) + |> Workflows.save_workflow(user) + + saved = Repo.preload(saved, [:jobs, :edges], force: true) + + assert length(saved.jobs) == 2 + assert length(saved.edges) == 1 + assert hd(saved.edges).id == trigger_edge.id + + # a->b edge deleted (lost target) + refute Repo.get(Workflows.Edge, a_to_b_edge.id) + # b->c edge deleted (lost source) + refute Repo.get(Workflows.Edge, b_to_c_edge.id) + end + + test "edge retargeting: replace middle job retargets both edges" do + project = insert(:project) + user = insert(:user) + + trigger_id = Ecto.UUID.generate() + job_a_id = Ecto.UUID.generate() + job_b_id = Ecto.UUID.generate() + job_c_id = Ecto.UUID.generate() + + {:ok, workflow} = + Workflows.save_workflow( + %{ + name: "replace-middle-job", + project_id: project.id, + jobs: [ + %{id: job_a_id, name: "job-a", body: "fn(state)"}, + %{id: job_b_id, name: "job-b", body: "fn(state)"}, + %{id: job_c_id, name: "job-c", body: "fn(state)"} + ], + triggers: [%{id: trigger_id, type: :webhook}], + edges: [ + %{ + source_trigger_id: trigger_id, + target_job_id: job_a_id, + condition_type: :always + }, + %{ + source_job_id: job_a_id, + target_job_id: job_b_id, + condition_type: :on_job_success + }, + %{ + source_job_id: job_b_id, + target_job_id: job_c_id, + condition_type: :on_job_success + } + ] + }, + user + ) + + trigger_edge = + Enum.find(workflow.edges, &(&1.source_trigger_id == trigger_id)) + + a_to_b_edge = + Enum.find(workflow.edges, &(&1.target_job_id == job_b_id)) + + b_to_c_edge = + Enum.find(workflow.edges, &(&1.source_job_id == job_b_id)) + + # Delete job_b, add job_d, retarget both edges + job_d_id = Ecto.UUID.generate() + + assert {:ok, saved} = + Workflows.change_workflow(workflow, %{ + jobs: [ + %{id: job_a_id, name: "job-a", body: "fn(state)"}, + %{id: job_d_id, name: "job-d", body: "fn(state)"}, + %{id: job_c_id, name: "job-c", body: "fn(state)"} + ], + triggers: [%{id: trigger_id, type: :webhook}], + edges: [ + %{ + id: trigger_edge.id, + source_trigger_id: trigger_id, + target_job_id: job_a_id, + condition_type: :always + }, + %{ + id: a_to_b_edge.id, + source_job_id: job_a_id, + target_job_id: job_d_id, + condition_type: :on_job_success + }, + %{ + id: b_to_c_edge.id, + source_job_id: job_d_id, + target_job_id: job_c_id, + condition_type: :on_job_success + } + ] + }) + |> Workflows.save_workflow(user) + + saved = Repo.preload(saved, [:jobs, :edges], force: true) + + assert length(saved.jobs) == 3 + assert length(saved.edges) == 3 + + # a->b edge retargeted to a->d + retargeted_a = Enum.find(saved.edges, &(&1.id == a_to_b_edge.id)) + assert retargeted_a.source_job_id == job_a_id + assert retargeted_a.target_job_id == job_d_id + + # b->c edge retargeted to d->c + retargeted_b = Enum.find(saved.edges, &(&1.id == b_to_c_edge.id)) + assert retargeted_b.source_job_id == job_d_id + assert retargeted_b.target_job_id == job_c_id + end + test "saving with locks" do user = insert(:user) valid_attrs = params_with_assocs(:workflow, jobs: [params_for(:job)]) From 1996959fa0a7ce953054b8691da2a3d043948cc1 Mon Sep 17 00:00:00 2001 From: Midigo Frank <39288959+midigofrank@users.noreply.github.com> Date: Tue, 10 Feb 2026 11:09:28 +0300 Subject: [PATCH 10/11] Update version chip to reuse existing component (#4389) * reuse existing version_chip component * update changelog --- CHANGELOG.md | 2 ++ lib/lightning_web/components/layout_components.ex | 8 ++------ lib/lightning_web/live/components/common.ex | 2 +- 3 files changed, 5 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7c77bc0a6a..594d05c792 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,8 @@ and this project adheres to ### Fixed +- Version chip missing tooltips + [#4389](https://github.com/OpenFn/lightning/pull/4389) - Fixed StaleEntryError when saving workflows where a job is replaced and its edge retargeted to a new job (e.g. via AI assistant) [#4383](https://github.com/OpenFn/lightning/issues/4383) diff --git a/lib/lightning_web/components/layout_components.ex b/lib/lightning_web/components/layout_components.ex index f7547148f6..56de80372e 100644 --- a/lib/lightning_web/components/layout_components.ex +++ b/lib/lightning_web/components/layout_components.ex @@ -458,18 +458,14 @@ defmodule LightningWeb.LayoutComponents do
-
- v{Application.spec(:lightning, :vsn)} -
+ <%!-- Collapsed branding: centered --%>