diff --git a/lib/gitlab/database/background_operation/common_job.rb b/lib/gitlab/database/background_operation/common_job.rb index 318e9accafdbdf4e9e8abcfcd0c739eb53901665..94dc370378c3adc0e3ccee3c1904c1cb9880f105 100644 --- a/lib/gitlab/database/background_operation/common_job.rb +++ b/lib/gitlab/database/background_operation/common_job.rb @@ -38,6 +38,8 @@ module CommonJob scope :for_partition, ->(partition) { where(partition: partition) } scope :executable, -> { with_statuses(:pending, :running) } + scope :failed, -> { with_status(:failed) } + scope :running, -> { with_status(:running) } # Partition should not be changed once the record is created attr_readonly :partition @@ -65,8 +67,29 @@ module CommonJob state :running, value: 1 state :failed, value: 2 state :succeeded, value: 3 + + event :run do + transition pending: :running + end + + event :succeed do + transition running: :succeeded + end + + event :failure do + transition [:pending, :running] => :failed + end end end + + def failure!(error:) + update!( + status: :failed, + finished_at: Time.current, + error_message: error.message, + error_class: error.class.name + ) + end end end end diff --git a/lib/gitlab/database/background_operation/orchestrator.rb b/lib/gitlab/database/background_operation/orchestrator.rb new file mode 100644 index 0000000000000000000000000000000000000000..af3cac990028548f60602db0a09a7e090ec2a633 --- /dev/null +++ b/lib/gitlab/database/background_operation/orchestrator.rb @@ -0,0 +1,104 @@ +# frozen_string_literal: true + +module Gitlab + module Database + module BackgroundOperation + class Orchestrator + MINIMUM_JOBS_FOR_FAILURE_CHECK = 10 + MAXIMUM_FAILURE_RATIO = 0.5 + + def initialize(connection:, executor: Executor.new(connection: connection)) + @connection = connection + @executor = executor + end + + def orchestrate_operation_job(active_operation) + next_job = find_or_create_next_job(active_operation) + + return finish_active_operation(active_operation) unless next_job + + executor.perform(next_job) + adjust_operation(active_operation) + active_operation.failure! if next_job.failed? && should_stop_operation?(active_operation) + end + + private + + attr_reader :connection, :executor + + def find_or_create_next_job(active_operation) + next_batch_min, next_batch_max = find_next_batch_range(active_operation) + + return active_operation.jobs.failed.first if next_batch_min.nil? || next_batch_max.nil? + + active_operation.jobs.create!( + min_cursor: [next_batch_min], + max_cursor: [next_batch_max], + batch_size: active_operation.batch_size, + sub_batch_size: active_operation.sub_batch_size, + pause_ms: active_operation.pause_ms + ) + rescue ActiveRecord::RecordInvalid + # log + end + + def find_next_batch_range(active_operation) + batching_strategy = active_operation.batch_class.new(connection: connection) + batch_min_value = next_min_value(active_operation) + + next_batch_bounds = batching_strategy.next_batch( + active_operation.table_name, + active_operation.column_name, + batch_min_value: batch_min_value, + batch_size: active_operation.batch_size, + job_arguments: active_operation.job_arguments, + job_class: active_operation.job_class) + + return if next_batch_bounds.nil? + + clamped_batch_range(active_operation, next_batch_bounds) + end + + def clamped_batch_range(active_operation, next_bounds) + next_min, next_max = next_bounds + + return if (next_min <=> active_operation.max_cursor.first) > 0 + + next_max = active_operation.max_cursor.first if (next_max <=> active_operation.max_cursor.first) > 0 + + [next_min, next_max] + end + + def next_min_value(active_operation) + last_job = active_operation.last_job + return active_operation.min_cursor.first unless last_job + + last_job.max_cursor&.first || active_operation.min_cursor.first + end + + def finish_active_operation(active_operation) + return if active_operation.jobs.running.exists? + + if active_operation.jobs.failed.exists? + active_operation.failure! + else + active_operation.finish! + end + end + + def should_stop_operation?(active_operation) + total_jobs = active_operation.jobs.count + return false if total_jobs < MINIMUM_JOBS_FOR_FAILURE_CHECK + + failed_jobs = active_operation.jobs.failed.count + failed_jobs.fdiv(total_jobs) > MAXIMUM_FAILURE_RATIO + end + + def adjust_operation(active_operation) + # Future: Add health status checks and optimization logic + # Similar to BatchedMigrationRunner's adjust_migration method + end + end + end + end +end diff --git a/lib/gitlab/database/background_operation/worker.rb b/lib/gitlab/database/background_operation/worker.rb index 5f6d05df2778671a4688eb8cdabbf22512054d56..cec6ae0572c31001455acdc300a6e804e1390be3 100644 --- a/lib/gitlab/database/background_operation/worker.rb +++ b/lib/gitlab/database/background_operation/worker.rb @@ -14,6 +14,11 @@ class Worker < SharedModel inverse_of: :worker, partition_foreign_key: :worker_partition + has_one :last_job, -> { where.not(max_cursor: nil).order(max_cursor: :desc) }, + class_name: 'Gitlab::Database::BackgroundOperation::Job', + inverse_of: :worker, + partition_foreign_key: :worker_partition + belongs_to :organization belongs_to :user end