Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 39 additions & 42 deletions lib/delayed/monitor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -95,37 +95,34 @@ def default_tags
}
end

def grouped_count(scope, **kwargs)
selects = kwargs.map { |k, v| "#{v} AS #{k}" }.join(', ')
counts = kwargs.keys.map { |k| "SUM(#{k}) AS #{k}" }.join(', ')

Delayed::Job.from(scope.select("priority, queue, #{selects}")
.group(:priority, :queue))
# This method generates a query that scans the specified scope, groups by
# priority and queue, and calculates the specified aggregates. An outer
# query is executed for priority bucketing and appending db_now_utc (to
# avoid running these computations for each tuple in the inner query).
def grouped_query(scope, include_db_time: false, **kwargs)
inner_selects = kwargs.map { |key, (agg, expr)| as_expression(agg, expr, key) }
outer_selects = kwargs.map { |key, (agg, _)| as_expression(agg == :count ? :sum : agg, key, key) }
outer_selects << "#{self.class.sql_now_in_utc} AS db_now_utc" if include_db_time

Delayed::Job
.from(scope.select(:priority, :queue, *inner_selects).group(:priority, :queue))
.group(priority_case_statement, :queue).select(
counts,
*outer_selects,
"#{priority_case_statement} AS priority",
'queue AS queue',
).group_by { |j| [j.priority.to_i, j.queue] }
.transform_values(&:first)
end

def grouped_min(scope, column)
Delayed::Job.from(scope.select("priority, queue, MIN(#{column}) AS #{column}"))
.group(priority_case_statement, :queue)
.select(
"MIN(#{column}) AS #{column}",
"#{priority_case_statement} AS priority",
'queue AS queue',
"#{self.class.sql_now_in_utc} AS db_now_utc",
).group_by { |j| [j.priority.to_i, j.queue] }
.transform_values(&:first)
def as_expression(aggregate_function, aggregate_expression, column_name)
"#{aggregate_function.to_s.upcase}(#{aggregate_expression}) AS #{column_name}"
end

def count_grouped
if Job.connection.supports_partial_index?
failed_count_grouped.merge(live_count_grouped) { |_, l, f| l + f }
else
grouped_count(jobs, count: 'COUNT(*)').transform_values(&:count)
grouped_query(jobs, count: [:count, '*']).transform_values(&:count)
end
end

Expand All @@ -142,23 +139,23 @@ def erroring_count_grouped
end

def locked_count_grouped
@memo[:locked_count_grouped] ||= pending_counts.transform_values(&:claimed_count)
pending_counts.transform_values(&:claimed_count)
end

def failed_count_grouped
@memo[:failed_count_grouped] ||= grouped_count(jobs.failed, count: 'COUNT(*)').transform_values(&:count)
failed_counts.transform_values(&:count)
end

def max_lock_age_grouped
oldest_locked_at_query.transform_values { |j| time_ago(db_now(j), j.locked_at) }
pending_counts.transform_values { |j| time_ago(db_now(j), j.locked_at) }
end

def max_age_grouped
oldest_run_at_query.transform_values { |j| time_ago(db_now(j), j.run_at) }
live_counts.transform_values { |j| time_ago(db_now(j), j.run_at) }
end

def alert_age_percent_grouped
oldest_run_at_query.each_with_object({}) do |((priority, queue), j), metrics|
live_counts.each_with_object({}) do |((priority, queue), j), metrics|
max_age = time_ago(db_now(j), j.run_at)
alert_age = Priority.new(priority).alert_age
metrics[[priority, queue]] = [max_age / alert_age * 100, 100].min if alert_age
Expand All @@ -172,38 +169,38 @@ def workable_count_grouped
alias working_count_grouped locked_count_grouped

def oldest_locked_job_grouped
oldest_locked_at_query.transform_values(&:locked_at)
pending_counts.transform_values(&:locked_at).compact
end

def oldest_workable_job_grouped
oldest_run_at_query.transform_values(&:run_at)
end

def oldest_locked_at_query
@memo[:oldest_locked_at_query] ||= grouped_min(jobs.claimed, :locked_at)
end

def oldest_run_at_query
@memo[:oldest_run_at_query] ||= grouped_min(jobs.claimable, :run_at)
live_counts.transform_values(&:run_at).compact
end

def live_counts
@memo[:live_counts] ||= grouped_count(
@memo[:live_counts] ||= grouped_query(
jobs.live,
count: 'COUNT(*)',
future_count: "SUM(#{case_when(Job.future_clause.to_sql)})",
erroring_count: "SUM(#{case_when(Job.erroring_clause.to_sql)})",
include_db_time: true,
count: [:count, '*'],
future_count: [:sum, case_when(Job.future_clause.to_sql)],
erroring_count: [:sum, case_when(Job.erroring_clause.to_sql)],
run_at: [:min, case_when(Job.pending_clause.to_sql, 'run_at')],
)
end

def pending_counts
@memo[:pending_counts] ||= grouped_count(
@memo[:pending_counts] ||= grouped_query(
jobs.pending,
claimed_count: "SUM(#{case_when(Job.claimed_clause.to_sql)})",
claimable_count: "SUM(#{case_when(Job.claimable_clause.to_sql)})",
include_db_time: true,
claimed_count: [:sum, case_when(Job.claimed_clause.to_sql)],
claimable_count: [:sum, case_when(Job.claimable_clause.to_sql)],
locked_at: [:min, case_when(Job.claimed_clause.to_sql, 'locked_at')],
)
end

def failed_counts
@memo[:failed_counts] ||= grouped_query(jobs.failed, count: [:count, '*'])
end

def db_now(record)
self.class.parse_utc_time(record.db_now_utc)
end
Expand All @@ -212,8 +209,8 @@ def time_ago(now, value)
[now - (value || now), 0].max
end

def case_when(condition)
"CASE WHEN #{condition} THEN 1 ELSE 0 END"
def case_when(condition, true_val = 1)
"CASE WHEN #{condition} THEN #{true_val} ELSE #{true_val == 1 ? 0 : 'NULL'} END"
end

def priority_case_statement
Expand Down
Loading