diff --git a/.rubocop.yml b/.rubocop.yml index 9322efd8591ef18a222dcd8c3eda7016b59a2b18..40cbb37da7a0041458dd36fcef0fe70ac301df5f 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -1259,6 +1259,7 @@ Cop/SidekiqApiUsage: - 'lib/gitlab/redis/queues.rb' - 'app/workers/concerns/application_worker.rb' - 'config/initializers/active_job_shard_support.rb' + - 'app/workers/concurrency_limit/resume_worker.rb' Cop/FeatureFlagUsage: Include: diff --git a/app/workers/concurrency_limit/resume_worker.rb b/app/workers/concurrency_limit/resume_worker.rb index 37572e38b16240206088e29b797e69b1b12002a6..a721a283889716d6ef33614925a3f04a89f8f51d 100644 --- a/app/workers/concurrency_limit/resume_worker.rb +++ b/app/workers/concurrency_limit/resume_worker.rb @@ -40,16 +40,11 @@ def schedule_workers end def schedule_worker(worker) - _, pool = Gitlab::SidekiqSharding::Router.get_shard_instance(worker.get_sidekiq_options['store']) - Sidekiq::Client.via(pool) do - queue = ::Gitlab::SidekiqConfig::WorkerRouter.global.route(worker) + Gitlab::SidekiqSharding::Router.route(worker) do # Schedules ResumeWorker job to the respective queue of the `worker` we're resuming. # This is because `worker_limit` requires reading environment variables unique each sidekiq shard, # whereas ResumeWorker (cronjob) always runs in the default queue. - # - # rubocop: disable Cop/SidekiqApiUsage -- valid usage of scheduling to other queue - Sidekiq::Client.push('class' => self.class, 'args' => [worker.name], 'queue' => queue) - # rubocop: enable Cop/SidekiqApiUsage + Sidekiq::Client.push('class' => self.class, 'args' => [worker.name], 'queue' => queue_for_worker(worker)) end end @@ -73,7 +68,12 @@ def process_worker(worker_name) resumed_jobs_num = resume_processing!(worker) cleanup_stale_trackers(worker) - self.class.perform_in(RESCHEDULE_DELAY, worker_name) if queue_size(worker) > 0 + if queue_size(worker) > 0 + Gitlab::SidekiqSharding::Router.route(worker) do + Sidekiq::Client.enqueue_to_in(queue_for_worker(worker), RESCHEDULE_DELAY, self.class, [worker.name]) + end + end + log_extra_metadata_on_done(:resumed_jobs, resumed_jobs_num) end @@ -104,5 +104,9 @@ def worker_limit(worker) def workers Gitlab::SidekiqConfig.workers_without_default.map(&:klass) end + + def queue_for_worker(worker) + ::Gitlab::SidekiqConfig::WorkerRouter.global.route(worker) + end end end diff --git a/ee/spec/workers/concurrency_limit/resume_worker_spec.rb b/ee/spec/workers/concurrency_limit/resume_worker_spec.rb index 728e4d4cf8b8064908e2b445452e789eaf823518..aa47c6376cc1d2fdfb891bf5710972822d7bd993 100644 --- a/ee/spec/workers/concurrency_limit/resume_worker_spec.rb +++ b/ee/spec/workers/concurrency_limit/resume_worker_spec.rb @@ -261,7 +261,8 @@ def with_shard_client(&block) expect(Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService) .to receive(:resume_processing!) .with(worker_with_concurrency_limit.name) - expect(described_class).to receive(:perform_in) + expect(Sidekiq::Client).to receive(:enqueue_to_in).with('default', described_class::RESCHEDULE_DELAY, + described_class, [worker_with_concurrency_limit.name]) perform end