From 501f0f0fe1767a2081068ea20a468aa857386fee Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Mon, 19 Apr 2021 10:29:47 +0700 Subject: [PATCH 01/10] Implement WorkerRouter to route a worker to the desirable queue Issue https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/1016 --- lib/gitlab/sidekiq_config/worker_router.rb | 83 ++++++++++++++++++++++ 1 file changed, 83 insertions(+) create mode 100644 lib/gitlab/sidekiq_config/worker_router.rb diff --git a/lib/gitlab/sidekiq_config/worker_router.rb b/lib/gitlab/sidekiq_config/worker_router.rb new file mode 100644 index 00000000000000..a97bce43117d5d --- /dev/null +++ b/lib/gitlab/sidekiq_config/worker_router.rb @@ -0,0 +1,83 @@ +# frozen_string_literal: true + +module Gitlab + module SidekiqConfig + class WorkerRouter + InvalidRoutingRule = Class.new(StandardError) + RuleEvalurator = Struct.new(:matcher, :queue_name) + # call-seq: + # router = WorkerRouter.new([ + # ["resource_boundary=cpu", 'cpu_boundary'], + # ["feature_category=pages", nil], + # ["*", "default"] + # ]) + # router.route(ACpuBoundaryWorker) # Return "cpu_boundary" + # router.route(JustAPagesWorker) # Return "just_a_pages_worker" + # router.route(RandomWorker) # Return "default" + # + # This class is responsible for routing a Sidekiq worker to a certain + # queue defined in the input routing rules. The input routing rules, as + # described above, is an order-matter array of tuples [query, queue_name]. + # + # - The query syntax is the same as the "queue selector" detailedly + # denoted in doc/administration/operations/extra_sidekiq_processes.md. + # + # - The queue_name must be a valid Sidekiq queue name. If the queue name + # is nil, the worker is routed to the queue generated by the name of the + # worker instead. + # + # Rules are evaluated from first to last, and as soon as we find a match + # for a given worker we stop processing for that worker (first match + # wins). If the worker doesn't match any rule, it falls back the queue + # name generated from the worker name + # + # For further information, please visit: + # https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/1016 + # + def initialize(routing_rules) + @rule_evaluators = parse_routing_rules(routing_rules) + end + + def route(worker_klass) + # A medium representation to ensure the backward-compatibility of + # WorkerMatcher + worker_metadata = generate_worker_metadata(worker_klass) + @rule_evaluators.each do |evaluator| + if evaluator.matcher.match?(worker_metadata) + return evaluator.queue_name.presence || queue_from_worker_name(worker_klass) + end + end + + queue_from_worker_name(worker_klass) + end + + private + + def parse_routing_rules(routing_rules) + raise InvalidRoutingRule, 'The set of routing rule should be an array' unless routing_rules.is_a?(Array) + + routing_rules.map do |rule_tuple| + if !rule_tuple.is_a?(Array) || !rule_tuple.length == 2 + raise InvalidRoutingRule, "Routing rule `#{rule_tuple.inspect}` is invalid" + end + + RuleEvalurator.new( + ::Gitlab::SidekiqConfig::WorkerMatcher.new(rule_tuple[0]), + rule_tuple[1] + ) + end + end + + def generate_worker_metadata(worker_klass) + # The ee indicator here is insignificant and irrelevant to the matcher. + # Plus, it's not easy to determine whether a worker is **only** + # available in EE. + ::Gitlab::SidekiqConfig::Worker.new(worker_klass, ee: false).to_yaml + end + + def queue_from_worker_name(worker_klass) + [worker_klass.queue_namespace, worker_klass.base_queue_name].compact.join(':') + end + end + end +end -- GitLab From a8ff3e5711ed2b8c7b54c00eaec4ff81da24ad06 Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Mon, 19 Apr 2021 11:35:01 +0700 Subject: [PATCH 02/10] Add `sidekiq.routing_rules` to Gitlab configuration Issue https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/1016 --- app/workers/concerns/application_worker.rb | 11 +---------- config/gitlab.yml.example | 6 ++++++ config/initializers/1_settings.rb | 1 + lib/gitlab/sidekiq_config/worker_router.rb | 19 +++++++++++++++---- 4 files changed, 23 insertions(+), 14 deletions(-) diff --git a/app/workers/concerns/application_worker.rb b/app/workers/concerns/application_worker.rb index 0de26e27631106..6b88c2e17dc6e3 100644 --- a/app/workers/concerns/application_worker.rb +++ b/app/workers/concerns/application_worker.rb @@ -50,19 +50,10 @@ def inherited(subclass) end def set_queue - queue_name = [queue_namespace, base_queue_name].compact.join(':') - + queue_name = ::Gitlab::SidekiqConfig::WorkerRouter.global.route(self) sidekiq_options queue: queue_name # rubocop:disable Cop/SidekiqOptionsQueue end - def base_queue_name - name - .sub(/\AGitlab::/, '') - .sub(/Worker\z/, '') - .underscore - .tr('/', '_') - end - def queue_namespace(new_namespace = nil) if new_namespace sidekiq_options queue_namespace: new_namespace diff --git a/config/gitlab.yml.example b/config/gitlab.yml.example index da1a15302da39a..456a0b926cbca0 100644 --- a/config/gitlab.yml.example +++ b/config/gitlab.yml.example @@ -438,6 +438,12 @@ production: &base ## Sidekiq sidekiq: log_format: json # (default is the original format) + # An array of tuples indicating the rules for re-routing a worker to a + # desirable queue before scheduling. For example: + # routing_rules: + # - ["resource_boundary=cpu", "cpu_boundary"] + # - ["feature_category=pages", null] + # - ["*", "default"] ## Auxiliary jobs # Periodically executed jobs, to self-heal GitLab, do external synchronizations, etc. diff --git a/config/initializers/1_settings.rb b/config/initializers/1_settings.rb index 99335321f28e06..01c6c6b2effc48 100644 --- a/config/initializers/1_settings.rb +++ b/config/initializers/1_settings.rb @@ -695,6 +695,7 @@ # Settings['sidekiq'] ||= Settingslogic.new({}) Settings['sidekiq']['log_format'] ||= 'default' +Settings['sidekiq']['routing_rules'] ||= [] # # GitLab Shell diff --git a/lib/gitlab/sidekiq_config/worker_router.rb b/lib/gitlab/sidekiq_config/worker_router.rb index a97bce43117d5d..81a4c3dd7fe0f0 100644 --- a/lib/gitlab/sidekiq_config/worker_router.rb +++ b/lib/gitlab/sidekiq_config/worker_router.rb @@ -5,6 +5,11 @@ module SidekiqConfig class WorkerRouter InvalidRoutingRule = Class.new(StandardError) RuleEvalurator = Struct.new(:matcher, :queue_name) + + def self.global + @global_worker_router ||= new(::Gitlab.config.sidekiq.routing_rules) + end + # call-seq: # router = WorkerRouter.new([ # ["resource_boundary=cpu", 'cpu_boundary'], @@ -44,11 +49,11 @@ def route(worker_klass) worker_metadata = generate_worker_metadata(worker_klass) @rule_evaluators.each do |evaluator| if evaluator.matcher.match?(worker_metadata) - return evaluator.queue_name.presence || queue_from_worker_name(worker_klass) + return evaluator.queue_name.presence || queue_name_from_worker_name(worker_klass) end end - queue_from_worker_name(worker_klass) + queue_name_from_worker_name(worker_klass) end private @@ -75,8 +80,14 @@ def generate_worker_metadata(worker_klass) ::Gitlab::SidekiqConfig::Worker.new(worker_klass, ee: false).to_yaml end - def queue_from_worker_name(worker_klass) - [worker_klass.queue_namespace, worker_klass.base_queue_name].compact.join(':') + def queue_name_from_worker_name(worker_klass) + base_queue_name = + worker_klass.name + .sub(/\AGitlab::/, '') + .sub(/Worker\z/, '') + .underscore + .tr('/', '_') + [worker_klass.queue_namespace, base_queue_name].compact.join(':') end end end -- GitLab From b495282f16b8062963a3bcd5d5df787c4f02bb77 Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Mon, 19 Apr 2021 14:06:28 +0700 Subject: [PATCH 03/10] Reset queue after a worker attribute changes Issue https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/1016 --- app/workers/concerns/application_worker.rb | 2 + app/workers/concerns/worker_attributes.rb | 26 ++++----- lib/gitlab/class_attributes.rb | 16 ++++++ lib/gitlab/sidekiq_config/worker_router.rb | 18 ++++--- spec/lib/gitlab/class_attributes_spec.rb | 62 +++++++++++++++------- 5 files changed, 86 insertions(+), 38 deletions(-) diff --git a/app/workers/concerns/application_worker.rb b/app/workers/concerns/application_worker.rb index 6b88c2e17dc6e3..843be4896a366c 100644 --- a/app/workers/concerns/application_worker.rb +++ b/app/workers/concerns/application_worker.rb @@ -16,6 +16,7 @@ module ApplicationWorker included do set_queue + after_set_class_attribute { set_queue } def structured_payload(payload = {}) context = Gitlab::ApplicationContext.current.merge( @@ -47,6 +48,7 @@ def logging_extras class_methods do def inherited(subclass) subclass.set_queue + subclass.after_set_class_attribute { subclass.set_queue } end def set_queue diff --git a/app/workers/concerns/worker_attributes.rb b/app/workers/concerns/worker_attributes.rb index 6f99fd089aca4d..6dee94026913c2 100644 --- a/app/workers/concerns/worker_attributes.rb +++ b/app/workers/concerns/worker_attributes.rb @@ -36,13 +36,13 @@ module WorkerAttributes def feature_category(value, *extras) raise "Invalid category. Use `feature_category_not_owned!` to mark a worker as not owned" if value == :not_owned - class_attributes[:feature_category] = value + set_class_attribute(:feature_category, value) end # Special case: mark this work as not associated with a feature category # this should be used for cross-cutting concerns, such as mailer workers. def feature_category_not_owned! - class_attributes[:feature_category] = :not_owned + set_class_attribute(:feature_category, :not_owned) end def get_feature_category @@ -64,7 +64,7 @@ def feature_category_not_owned? def urgency(urgency) raise "Invalid urgency: #{urgency}" unless VALID_URGENCIES.include?(urgency) - class_attributes[:urgency] = urgency + set_class_attribute(:urgency, urgency) end def get_urgency @@ -75,8 +75,8 @@ def data_consistency(data_consistency, feature_flag: nil) raise ArgumentError, "Invalid data consistency: #{data_consistency}" unless VALID_DATA_CONSISTENCIES.include?(data_consistency) raise ArgumentError, 'Data consistency is already set' if class_attributes[:data_consistency] - class_attributes[:data_consistency_feature_flag] = feature_flag if feature_flag - class_attributes[:data_consistency] = data_consistency + set_class_attribute(:data_consistency_feature_flag, feature_flag) if feature_flag + set_class_attribute(:data_consistency, data_consistency) validate_worker_attributes! end @@ -105,7 +105,7 @@ def get_data_consistency_feature_flag_enabled? # doc/development/sidekiq_style_guide.md#jobs-with-external-dependencies for # details def worker_has_external_dependencies! - class_attributes[:external_dependencies] = true + set_class_attribute(:external_dependencies, true) end # Returns a truthy value if the worker has external dependencies. @@ -118,7 +118,7 @@ def worker_has_external_dependencies? def worker_resource_boundary(boundary) raise "Invalid boundary" unless VALID_RESOURCE_BOUNDARIES.include? boundary - class_attributes[:resource_boundary] = boundary + set_class_attribute(:resource_boundary, boundary) end def get_worker_resource_boundary @@ -126,7 +126,7 @@ def get_worker_resource_boundary end def idempotent! - class_attributes[:idempotent] = true + set_class_attribute(:idempotent, true) validate_worker_attributes! end @@ -136,7 +136,7 @@ def idempotent? end def weight(value) - class_attributes[:weight] = value + set_class_attribute(:weight, value) end def get_weight @@ -146,7 +146,7 @@ def get_weight end def tags(*values) - class_attributes[:tags] = values + set_class_attribute(:tags, values) end def get_tags @@ -154,8 +154,8 @@ def get_tags end def deduplicate(strategy, options = {}) - class_attributes[:deduplication_strategy] = strategy - class_attributes[:deduplication_options] = options + set_class_attribute(:deduplication_strategy, strategy) + set_class_attribute(:deduplication_options, options) end def get_deduplicate_strategy @@ -168,7 +168,7 @@ def get_deduplication_options end def big_payload! - class_attributes[:big_payload] = true + set_class_attribute(:big_payload, true) end def big_payload? diff --git a/lib/gitlab/class_attributes.rb b/lib/gitlab/class_attributes.rb index 6560c97b2e6f0c..6eea7590cbd5ca 100644 --- a/lib/gitlab/class_attributes.rb +++ b/lib/gitlab/class_attributes.rb @@ -14,6 +14,18 @@ def get_class_attribute(name) class_attributes[name] || superclass_attributes(name) end + def set_class_attribute(name, value) + class_attributes[name] = value + + after_hooks.each(&:call) + + value + end + + def after_set_class_attribute(&block) + after_hooks << block + end + private def class_attributes @@ -25,6 +37,10 @@ def superclass_attributes(name) superclass.get_class_attribute(name) end + + def after_hooks + @after_hooks ||= [] + end end end end diff --git a/lib/gitlab/sidekiq_config/worker_router.rb b/lib/gitlab/sidekiq_config/worker_router.rb index 81a4c3dd7fe0f0..d0721fc73e233b 100644 --- a/lib/gitlab/sidekiq_config/worker_router.rb +++ b/lib/gitlab/sidekiq_config/worker_router.rb @@ -6,6 +6,16 @@ class WorkerRouter InvalidRoutingRule = Class.new(StandardError) RuleEvalurator = Struct.new(:matcher, :queue_name) + def self.queue_name_from_worker_name(worker_klass) + base_queue_name = + worker_klass.name + .delete_prefix('Gitlab::') + .delete_suffix('Worker') + .underscore + .tr('/', '_') + [worker_klass.queue_namespace, base_queue_name].compact.join(':') + end + def self.global @global_worker_router ||= new(::Gitlab.config.sidekiq.routing_rules) end @@ -81,13 +91,7 @@ def generate_worker_metadata(worker_klass) end def queue_name_from_worker_name(worker_klass) - base_queue_name = - worker_klass.name - .sub(/\AGitlab::/, '') - .sub(/Worker\z/, '') - .underscore - .tr('/', '_') - [worker_klass.queue_namespace, base_queue_name].compact.join(':') + self.class.queue_name_from_worker_name(worker_klass) end end end diff --git a/spec/lib/gitlab/class_attributes_spec.rb b/spec/lib/gitlab/class_attributes_spec.rb index f8766f20495050..ac2a18a18605b7 100644 --- a/spec/lib/gitlab/class_attributes_spec.rb +++ b/spec/lib/gitlab/class_attributes_spec.rb @@ -6,36 +6,62 @@ Class.new do include Gitlab::ClassAttributes - def self.get_attribute(name) - get_class_attribute(name) + class << self + attr_reader :counter_1, :counter_2 + + # get_class_attribute and set_class_attribute are protected, + # hence those methods are for testing purpose + def get_attribute(name) + get_class_attribute(name) + end + + def set_attribute(name, value) + set_class_attribute(name, value) + end + end + + after_set_class_attribute do + @counter_1 ||= 0 + @counter_1 += 1 end - def self.set_attribute(name, value) - class_attributes[name] = value + after_set_class_attribute do + @counter_2 ||= 0 + @counter_2 += 2 end end end let(:subclass) { Class.new(klass) } - describe ".get_class_attribute" do - it "returns values set on the class" do - klass.set_attribute(:foo, :bar) + it "returns values set on the class" do + klass.set_attribute(:foo, :bar) - expect(klass.get_attribute(:foo)).to eq(:bar) - end + expect(klass.get_attribute(:foo)).to eq(:bar) + end - it "returns values set on a superclass" do - klass.set_attribute(:foo, :bar) + it "returns values set on a superclass" do + klass.set_attribute(:foo, :bar) - expect(subclass.get_attribute(:foo)).to eq(:bar) - end + expect(subclass.get_attribute(:foo)).to eq(:bar) + end - it "returns values from the subclass over attributes from a superclass" do - klass.set_attribute(:foo, :baz) - subclass.set_attribute(:foo, :bar) + it "returns values from the subclass over attributes from a superclass" do + klass.set_attribute(:foo, :baz) + subclass.set_attribute(:foo, :bar) - expect(subclass.get_attribute(:foo)).to eq(:bar) - end + expect(klass.get_attribute(:foo)).to eq(:baz) + expect(subclass.get_attribute(:foo)).to eq(:bar) + end + + it "triggers after hooks after set class values" do + expect(klass.counter_1).to be(nil) + expect(klass.counter_2).to be(nil) + + klass.set_attribute(:foo, :bar) + klass.set_attribute(:foo, :bar) + + expect(klass.counter_1).to eq(2) + expect(klass.counter_2).to eq(4) end end -- GitLab From a5eb559246058c4d0c3ac8ce2759a0673ab851b3 Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Tue, 20 Apr 2021 18:23:15 +0700 Subject: [PATCH 04/10] Fullfill the tests for the worker router Issue https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/1016 --- lib/gitlab/sidekiq_config/worker_router.rb | 8 +- .../sidekiq_config/worker_router_spec.rb | 152 ++++++++++++++++++ .../concerns/application_worker_spec.rb | 90 ++++++++++- 3 files changed, 242 insertions(+), 8 deletions(-) create mode 100644 spec/lib/gitlab/sidekiq_config/worker_router_spec.rb diff --git a/lib/gitlab/sidekiq_config/worker_router.rb b/lib/gitlab/sidekiq_config/worker_router.rb index d0721fc73e233b..859f2c22498d00 100644 --- a/lib/gitlab/sidekiq_config/worker_router.rb +++ b/lib/gitlab/sidekiq_config/worker_router.rb @@ -3,7 +3,7 @@ module Gitlab module SidekiqConfig class WorkerRouter - InvalidRoutingRule = Class.new(StandardError) + InvalidRoutingRuleError = Class.new(StandardError) RuleEvalurator = Struct.new(:matcher, :queue_name) def self.queue_name_from_worker_name(worker_klass) @@ -69,11 +69,11 @@ def route(worker_klass) private def parse_routing_rules(routing_rules) - raise InvalidRoutingRule, 'The set of routing rule should be an array' unless routing_rules.is_a?(Array) + raise InvalidRoutingRuleError, 'The set of routing rule should be an array' unless routing_rules.is_a?(Array) routing_rules.map do |rule_tuple| - if !rule_tuple.is_a?(Array) || !rule_tuple.length == 2 - raise InvalidRoutingRule, "Routing rule `#{rule_tuple.inspect}` is invalid" + if !rule_tuple.is_a?(Array) || rule_tuple.length != 2 + raise InvalidRoutingRuleError, "Routing rule `#{rule_tuple.inspect}` is invalid" end RuleEvalurator.new( diff --git a/spec/lib/gitlab/sidekiq_config/worker_router_spec.rb b/spec/lib/gitlab/sidekiq_config/worker_router_spec.rb new file mode 100644 index 00000000000000..234d4b916e3edd --- /dev/null +++ b/spec/lib/gitlab/sidekiq_config/worker_router_spec.rb @@ -0,0 +1,152 @@ +# frozen_string_literal: true + +require 'fast_spec_helper' +require 'rspec-parameterized' + +RSpec.describe Gitlab::SidekiqConfig::WorkerRouter do + describe '.queue_name_from_worker_name' do + using RSpec::Parameterized::TableSyntax + + def create_worker(name, namespace = nil) + Class.new.tap do |worker| + worker.define_singleton_method(:name) { name } + worker.define_singleton_method(:queue_namespace) { namespace } + end + end + + where(:worker, :expected_name) do + create_worker('PagesWorker') | 'pages' + create_worker('PipelineNotificationWorker') | 'pipeline_notification' + create_worker('PipelineHooksWorker', :pipeline_hooks) | 'pipeline_hooks:pipeline_hooks' + create_worker('Gitlab::JiraImport::AdvanceStageWorker') | 'jira_import_advance_stage' + create_worker('Gitlab::JiraImport::AdvanceStageWorker') | 'jira_import_advance_stage' + create_worker('Gitlab::JiraImport::AdvanceStageWorker') | 'jira_import_advance_stage' + create_worker('Gitlab::PhabricatorImport::ImportTasksWorker', :importer) | 'importer:phabricator_import_import_tasks' + end + + with_them do + it 'generates a valid queue name from worker name' do + expect(described_class.queue_name_from_worker_name(worker)).to eql(expected_name) + end + end + end + + shared_context 'router examples setup' do + using RSpec::Parameterized::TableSyntax + + let(:worker) do + Class.new do + def self.name + 'Gitlab::Foo::BarWorker' + end + + include ApplicationWorker + + feature_category :feature_a + urgency :low + worker_resource_boundary :cpu + tags :expensive + end + end + + where(:routing_rules, :expected_queue) do + # Default, no configuration + [] | 'foo_bar' + # Does not match, fallback to the named queue + [ + ['feature_category=feature_b|urgency=high', 'queue_a'], + ['resource_boundary=memory', 'queue_b'], + ['tags=cheap', 'queue_c'] + ] | 'foo_bar' + # Match a nil queue, fallback to named queue + [ + ['feature_category=feature_b|urgency=high', 'queue_a'], + ['resource_boundary=cpu', nil], + ['tags=cheap', 'queue_c'] + ] | 'foo_bar' + # Match the first criteria + [ + ['feature_category=feature_a|urgency=high', 'queue_a'], + ['resource_boundary=cpu', 'queue_b'], + ['tags=cheap', 'queue_c'] + ] | 'queue_a' + # Match the first criteria 2 + [ + ['feature_category=feature_b|urgency=low', 'queue_a'], + ['resource_boundary=cpu', 'queue_b'], + ['tags=cheap', 'queue_c'] + ] | 'queue_a' + # Match the second criteria + [ + ['feature_category=feature_b|urgency=high', 'queue_a'], + ['resource_boundary=memory', 'queue_b'], + ['tags=expensive', 'queue_c'] + ] | 'queue_c' + # Match all, first match wins + [ + ['feature_category=feature_a|urgency=low', 'queue_a'], + ['resource_boundary=cpu', 'queue_b'], + ['tags=expensive', 'queue_c'] + ] | 'queue_a' + # Match wildcard + [ + ['feature_category=feature_b|urgency=high', 'queue_a'], + ['resource_boundary=memory', 'queue_b'], + ['tags=cheap', 'queue_c'], + ['*', 'default'] + ] | 'default' + end + end + + describe '.global' do + context 'valid routing rules' do + include_context 'router examples setup' + + with_them do + before do + described_class.remove_instance_variable(:@global_worker_router) if described_class.instance_variable_defined?(:@global_worker_router) + stub_config(sidekiq: { routing_rules: routing_rules }) + end + + after do + described_class.remove_instance_variable(:@global_worker_router) + end + + it 'routes the worker to the correct queue' do + expect(described_class.global.route(worker)).to eql(expected_queue) + end + end + end + end + + describe '#route' do + context 'valid routing rules' do + include_context 'router examples setup' + + with_them do + it 'routes the worker to the correct queue' do + router = described_class.new(routing_rules) + + expect(router.route(worker)).to eql(expected_queue) + end + end + end + + context 'invalid routing rules' do + it 'raises an exception' do + expect { described_class.new(nil) }.to raise_error(described_class::InvalidRoutingRuleError) + expect { described_class.new(['feature_category=a']) }.to raise_error(described_class::InvalidRoutingRuleError) + expect { described_class.new([['feature_category=a', 'queue_a', 'queue_b']]) }.to raise_error(described_class::InvalidRoutingRuleError) + expect do + described_class.new( + [ + ['feature_category=a', 'queue_b'], + ['feature_category=b'] + ] + ) + end.to raise_error(described_class::InvalidRoutingRuleError) + expect { described_class.new([['invalid_term=a', 'queue_a']]) }.to raise_error(Gitlab::SidekiqConfig::WorkerMatcher::UnknownPredicate) + end + end + end +end diff --git a/spec/workers/concerns/application_worker_spec.rb b/spec/workers/concerns/application_worker_spec.rb index 07e11f014c3911..35385eff83ebcb 100644 --- a/spec/workers/concerns/application_worker_spec.rb +++ b/spec/workers/concerns/application_worker_spec.rb @@ -3,7 +3,9 @@ require 'spec_helper' RSpec.describe ApplicationWorker do - let_it_be(:worker) do + # We depend on the lazy-load characteristic of rspec. If the worker is loaded + # before setting up, it's likely to go wrong. + let(:worker) do Class.new do def self.name 'Gitlab::Foo::Bar::DummyWorker' @@ -14,10 +16,77 @@ def self.name end let(:instance) { worker.new } + let(:router) { double(:router) } - describe 'Sidekiq options' do - it 'sets the queue name based on the class name' do + before do + allow(::Gitlab::SidekiqConfig::WorkerRouter).to receive(:global).and_return(router) + allow(router).to receive(:route).and_return('foo_bar_dummy') + end + + describe 'Sidekiq attribtues' do + it 'sets the queue name based on the output of the router' do expect(worker.sidekiq_options['queue']).to eq('foo_bar_dummy') + expect(router).to have_received(:route).with(worker).at_least(:once) + end + + context 'when a worker attribute is updated' do + before do + counter = 0 + allow(router).to receive(:route) do + counter += 1 + "queue_#{counter}" + end + end + + it 'updates the queue name afterward' do + expect(worker.sidekiq_options['queue']).to eq('queue_1') + + worker.feature_category :pages + expect(worker.sidekiq_options['queue']).to eq('queue_2') + + worker.feature_category_not_owned! + expect(worker.sidekiq_options['queue']).to eq('queue_3') + + worker.urgency :high + expect(worker.sidekiq_options['queue']).to eq('queue_4') + + worker.worker_has_external_dependencies! + expect(worker.sidekiq_options['queue']).to eq('queue_5') + + worker.worker_resource_boundary :cpu + expect(worker.sidekiq_options['queue']).to eq('queue_6') + + worker.idempotent! + expect(worker.sidekiq_options['queue']).to eq('queue_7') + + worker.weight 3 + expect(worker.sidekiq_options['queue']).to eq('queue_8') + + worker.tags :hello + expect(worker.sidekiq_options['queue']).to eq('queue_9') + + worker.big_payload! + expect(worker.sidekiq_options['queue']).to eq('queue_10') + + expect(router).to have_received(:route).with(worker).at_least(10).times + end + end + + context 'when the worker is inherited' do + let(:sub_worker) { Class.new(worker) } + + before do + allow(router).to receive(:route).and_return('queue_1') + worker # Force loading worker 1 to update its queue + + allow(router).to receive(:route).and_return('queue_2') + end + + it 'sets the queue name for the inherited worker' do + expect(sub_worker.sidekiq_options['queue']).to eq('queue_2') + + expect(router).to have_received(:route).with(sub_worker).at_least(:once) + end end end @@ -74,11 +143,24 @@ def self.name end describe '.queue_namespace' do - it 'sets the queue name based on the class name' do + before do + allow(router).to receive(:route).and_return('foo_bar_dummy', 'some_namespace:foo_bar_dummy') + end + + it 'updates the quque name from the router again' do + expect(worker.queue).to eq('foo_bar_dummy') + worker.queue_namespace :some_namespace expect(worker.queue).to eq('some_namespace:foo_bar_dummy') end + + it 'updates the queue_namespace options of the worker' do + worker.queue_namespace :some_namespace + + expect(worker.queue_namespace).to eql('some_namespace') + expect(worker.sidekiq_options['queue_namespace']).to be(:some_namespace) + end end describe '.queue' do -- GitLab From 9dbe95126a2ca1632240f15d8d3607009eaa794d Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Thu, 22 Apr 2021 12:33:53 +0700 Subject: [PATCH 05/10] Capture the error when the routing rule is invalid Issue https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/1016 --- lib/gitlab/sidekiq_config/worker_router.rb | 4 ++ .../sidekiq_config/worker_router_spec.rb | 51 +++++++++++++++++-- 2 files changed, 50 insertions(+), 5 deletions(-) diff --git a/lib/gitlab/sidekiq_config/worker_router.rb b/lib/gitlab/sidekiq_config/worker_router.rb index 859f2c22498d00..7c5b7172c29592 100644 --- a/lib/gitlab/sidekiq_config/worker_router.rb +++ b/lib/gitlab/sidekiq_config/worker_router.rb @@ -18,6 +18,10 @@ def self.queue_name_from_worker_name(worker_klass) def self.global @global_worker_router ||= new(::Gitlab.config.sidekiq.routing_rules) + rescue InvalidRoutingRuleError, ::Gitlab::SidekiqConfig::WorkerMatcher::UnknownPredicate => e + ::Gitlab::ErrorTracking.track_and_raise_for_dev_exception(e) + + @global_worker_router = new([]) end # call-seq: diff --git a/spec/lib/gitlab/sidekiq_config/worker_router_spec.rb b/spec/lib/gitlab/sidekiq_config/worker_router_spec.rb index 234d4b916e3edd..ad79641d036f86 100644 --- a/spec/lib/gitlab/sidekiq_config/worker_router_spec.rb +++ b/spec/lib/gitlab/sidekiq_config/worker_router_spec.rb @@ -99,24 +99,65 @@ def self.name end describe '.global' do + before do + described_class.remove_instance_variable(:@global_worker_router) if described_class.instance_variable_defined?(:@global_worker_router) + end + + after do + described_class.remove_instance_variable(:@global_worker_router) + end + context 'valid routing rules' do include_context 'router examples setup' with_them do before do - described_class.remove_instance_variable(:@global_worker_router) if described_class.instance_variable_defined?(:@global_worker_router) stub_config(sidekiq: { routing_rules: routing_rules }) end - after do - described_class.remove_instance_variable(:@global_worker_router) - end - it 'routes the worker to the correct queue' do expect(described_class.global.route(worker)).to eql(expected_queue) end end end + + context 'invalid routing rules' do + let(:worker) do + Class.new do + def self.name + 'Gitlab::Foo::BarWorker' + end + + include ApplicationWorker + end + end + + before do + stub_config(sidekiq: { routing_rules: routing_rules }) + end + + context 'invalid routing rules format' do + let(:routing_rules) { ['feature_category=a'] } + + it 'captures the error and falls back to an empty route' do + expect(Gitlab::ErrorTracking).to receive(:track_and_raise_for_dev_exception).with(be_a(described_class::InvalidRoutingRuleError)) + + expect(described_class.global.route(worker)).to eql('foo_bar') + end + end + + context 'invalid predicate' do + let(:routing_rules) { [['invalid_term=a', 'queue_a']] } + + it 'captures the error and falls back to an empty route' do + expect(Gitlab::ErrorTracking).to receive(:track_and_raise_for_dev_exception).with( + be_a(Gitlab::SidekiqConfig::WorkerMatcher::UnknownPredicate) + ) + + expect(described_class.global.route(worker)).to eql('foo_bar') + end + end + end end describe '#route' do -- GitLab From e0fcd233e56940bc5bb507e2896e6ca327a5ad75 Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Thu, 22 Apr 2021 13:32:12 +0700 Subject: [PATCH 06/10] Add a changelog entry --- ...plement-sidekiq-queue-re-routing-in-the-application-s.yml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 changelogs/unreleased/qmnguyen0711-1016-implement-sidekiq-queue-re-routing-in-the-application-s.yml diff --git a/changelogs/unreleased/qmnguyen0711-1016-implement-sidekiq-queue-re-routing-in-the-application-s.yml b/changelogs/unreleased/qmnguyen0711-1016-implement-sidekiq-queue-re-routing-in-the-application-s.yml new file mode 100644 index 00000000000000..1db2f491f742f8 --- /dev/null +++ b/changelogs/unreleased/qmnguyen0711-1016-implement-sidekiq-queue-re-routing-in-the-application-s.yml @@ -0,0 +1,5 @@ +--- +title: Implement Sidekiq queue re-routing in the application +merge_request: 59604 +author: +type: added -- GitLab From 59b0257a30e4205ead5cf3bed16d7806470657f5 Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Thu, 22 Apr 2021 23:07:11 +0700 Subject: [PATCH 07/10] Address code review comments Issue https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/1016 --- lib/gitlab/sidekiq_config/worker_router.rb | 17 ++++++++------ .../sidekiq_config/worker_router_spec.rb | 23 ++++++++++++++++++- 2 files changed, 32 insertions(+), 8 deletions(-) diff --git a/lib/gitlab/sidekiq_config/worker_router.rb b/lib/gitlab/sidekiq_config/worker_router.rb index 7c5b7172c29592..be1631485a8553 100644 --- a/lib/gitlab/sidekiq_config/worker_router.rb +++ b/lib/gitlab/sidekiq_config/worker_router.rb @@ -4,7 +4,7 @@ module Gitlab module SidekiqConfig class WorkerRouter InvalidRoutingRuleError = Class.new(StandardError) - RuleEvalurator = Struct.new(:matcher, :queue_name) + RuleEvaluator = Struct.new(:matcher, :queue_name) def self.queue_name_from_worker_name(worker_klass) base_queue_name = @@ -28,10 +28,12 @@ def self.global # router = WorkerRouter.new([ # ["resource_boundary=cpu", 'cpu_boundary'], # ["feature_category=pages", nil], + # ["feature_category=source_code_management", ''], # ["*", "default"] # ]) # router.route(ACpuBoundaryWorker) # Return "cpu_boundary" # router.route(JustAPagesWorker) # Return "just_a_pages_worker" + # router.route(PostReceive) # Return "post_receive" # router.route(RandomWorker) # Return "default" # # This class is responsible for routing a Sidekiq worker to a certain @@ -42,8 +44,8 @@ def self.global # denoted in doc/administration/operations/extra_sidekiq_processes.md. # # - The queue_name must be a valid Sidekiq queue name. If the queue name - # is nil, the worker is routed to the queue generated by the name of the - # worker instead. + # is nil, or an empty string, the worker is routed to the queue generated + # by the name of the worker instead. # # Rules are evaluated from first to last, and as soon as we find a match # for a given worker we stop processing for that worker (first match @@ -73,16 +75,17 @@ def route(worker_klass) private def parse_routing_rules(routing_rules) - raise InvalidRoutingRuleError, 'The set of routing rule should be an array' unless routing_rules.is_a?(Array) + raise InvalidRoutingRuleError, 'The set of routing rule must be an array' unless routing_rules.is_a?(Array) routing_rules.map do |rule_tuple| if !rule_tuple.is_a?(Array) || rule_tuple.length != 2 raise InvalidRoutingRuleError, "Routing rule `#{rule_tuple.inspect}` is invalid" end - RuleEvalurator.new( - ::Gitlab::SidekiqConfig::WorkerMatcher.new(rule_tuple[0]), - rule_tuple[1] + selector, destination_queue = rule_tuple + RuleEvaluator.new( + ::Gitlab::SidekiqConfig::WorkerMatcher.new(selector), + destination_queue ) end end diff --git a/spec/lib/gitlab/sidekiq_config/worker_router_spec.rb b/spec/lib/gitlab/sidekiq_config/worker_router_spec.rb index ad79641d036f86..984fb01a432725 100644 --- a/spec/lib/gitlab/sidekiq_config/worker_router_spec.rb +++ b/spec/lib/gitlab/sidekiq_config/worker_router_spec.rb @@ -1,6 +1,6 @@ # frozen_string_literal: true -require 'fast_spec_helper' +require 'spec_helper' require 'rspec-parameterized' RSpec.describe Gitlab::SidekiqConfig::WorkerRouter do @@ -17,6 +17,8 @@ def create_worker(name, namespace = nil) where(:worker, :expected_name) do create_worker('PagesWorker') | 'pages' create_worker('PipelineNotificationWorker') | 'pipeline_notification' + create_worker('PostReceive') | 'post_receive' + create_worker('PostReceive', :git) | 'git:post_receive' create_worker('PipelineHooksWorker', :pipeline_hooks) | 'pipeline_hooks:pipeline_hooks' create_worker('Gitlab::JiraImport::AdvanceStageWorker') | 'jira_import_advance_stage' create_worker('Gitlab::JiraImport::AdvanceStageWorker') | 'jira_import_advance_stage' @@ -64,6 +66,12 @@ def self.name ['resource_boundary=cpu', nil], ['tags=cheap', 'queue_c'] ] | 'foo_bar' + # Match an empty string, fallback to named queue + [ + ['feature_category=feature_b|urgency=high', 'queue_a'], + ['resource_boundary=cpu', ''], + ['tags=cheap', 'queue_c'] + ] | 'foo_bar' # Match the first criteria [ ['feature_category=feature_a|urgency=high', 'queue_a'], @@ -88,6 +96,12 @@ def self.name ['resource_boundary=cpu', 'queue_b'], ['tags=expensive', 'queue_c'] ] | 'queue_a' + # Match the same criteria multiple times, the first match wins + [ + ['feature_category=feature_a', 'queue_a'], + ['feature_category=feature_a', 'queue_b'], + ['feature_category=feature_a', 'queue_c'] + ] | 'queue_a' # Match wildcard [ ['feature_category=feature_b|urgency=high', 'queue_a'], @@ -95,6 +109,13 @@ def self.name ['tags=cheap', 'queue_c'], ['*', 'default'] ] | 'default' + # Match wildcard at the top of the chain. It makes the following rules useless + [ + ['*', 'queue_foo'], + ['feature_category=feature_b|urgency=high', 'queue_a'], + ['resource_boundary=memory', 'queue_b'], + ['tags=cheap', 'queue_c'] + ] | 'queue_foo' end end -- GitLab From c4428654d7c9de6981a2e3c51e21f5cd556457d9 Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Fri, 23 Apr 2021 10:15:48 +0700 Subject: [PATCH 08/10] Extract routing validation to a private method --- lib/gitlab/sidekiq_config/worker_router.rb | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/lib/gitlab/sidekiq_config/worker_router.rb b/lib/gitlab/sidekiq_config/worker_router.rb index be1631485a8553..946296a24d3074 100644 --- a/lib/gitlab/sidekiq_config/worker_router.rb +++ b/lib/gitlab/sidekiq_config/worker_router.rb @@ -78,9 +78,7 @@ def parse_routing_rules(routing_rules) raise InvalidRoutingRuleError, 'The set of routing rule must be an array' unless routing_rules.is_a?(Array) routing_rules.map do |rule_tuple| - if !rule_tuple.is_a?(Array) || rule_tuple.length != 2 - raise InvalidRoutingRuleError, "Routing rule `#{rule_tuple.inspect}` is invalid" - end + raise InvalidRoutingRuleError, "Routing rule `#{rule_tuple.inspect}` is invalid" unless valid_routing_rule?(rule_tuple) selector, destination_queue = rule_tuple RuleEvaluator.new( @@ -90,6 +88,10 @@ def parse_routing_rules(routing_rules) end end + def valid_routing_rule?(rule_tuple) + rule_tuple.is_a?(Array) && rule_tuple.length == 2 + end + def generate_worker_metadata(worker_klass) # The ee indicator here is insignificant and irrelevant to the matcher. # Plus, it's not easy to determine whether a worker is **only** -- GitLab From 8e80170ccc4c51949d6df08cbc69c090b4a32a8b Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Fri, 23 Apr 2021 17:08:52 +0700 Subject: [PATCH 09/10] Add a comment section to explain a catcha with useless stubbing --- spec/workers/concerns/application_worker_spec.rb | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/spec/workers/concerns/application_worker_spec.rb b/spec/workers/concerns/application_worker_spec.rb index 35385eff83ebcb..ee569844febff2 100644 --- a/spec/workers/concerns/application_worker_spec.rb +++ b/spec/workers/concerns/application_worker_spec.rb @@ -4,7 +4,12 @@ RSpec.describe ApplicationWorker do # We depend on the lazy-load characteristic of rspec. If the worker is loaded - # before setting up, it's likely to go wrong. + # before setting up, it's likely to go wrong. Consider this catcha: + # before do + # allow(router).to receive(:route).with(worker).and_return('queue_1') + # end + # As worker is triggered, it includes ApplicationWorker, and the router is + # called before it is stubbed. That makes the stubbing useless. let(:worker) do Class.new do def self.name -- GitLab From 1f92b8fc3db30123e180f513975d87441ebfbd07 Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Wed, 28 Apr 2021 10:02:20 +0700 Subject: [PATCH 10/10] Address code review comments --- .../gitlab/sidekiq_config/worker_router_spec.rb | 16 +++++++--------- spec/workers/concerns/application_worker_spec.rb | 4 ++-- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/spec/lib/gitlab/sidekiq_config/worker_router_spec.rb b/spec/lib/gitlab/sidekiq_config/worker_router_spec.rb index 984fb01a432725..687e35813b1cfa 100644 --- a/spec/lib/gitlab/sidekiq_config/worker_router_spec.rb +++ b/spec/lib/gitlab/sidekiq_config/worker_router_spec.rb @@ -21,8 +21,6 @@ def create_worker(name, namespace = nil) create_worker('PostReceive', :git) | 'git:post_receive' create_worker('PipelineHooksWorker', :pipeline_hooks) | 'pipeline_hooks:pipeline_hooks' create_worker('Gitlab::JiraImport::AdvanceStageWorker') | 'jira_import_advance_stage' - create_worker('Gitlab::JiraImport::AdvanceStageWorker') | 'jira_import_advance_stage' - create_worker('Gitlab::JiraImport::AdvanceStageWorker') | 'jira_import_advance_stage' create_worker('Gitlab::PhabricatorImport::ImportTasksWorker', :importer) | 'importer:phabricator_import_import_tasks' end @@ -72,19 +70,19 @@ def self.name ['resource_boundary=cpu', ''], ['tags=cheap', 'queue_c'] ] | 'foo_bar' - # Match the first criteria + # Match the first rule [ ['feature_category=feature_a|urgency=high', 'queue_a'], ['resource_boundary=cpu', 'queue_b'], ['tags=cheap', 'queue_c'] ] | 'queue_a' - # Match the first criteria 2 + # Match the first rule 2 [ ['feature_category=feature_b|urgency=low', 'queue_a'], ['resource_boundary=cpu', 'queue_b'], ['tags=cheap', 'queue_c'] ] | 'queue_a' - # Match the second criteria + # Match the third rule [ ['feature_category=feature_b|urgency=high', 'queue_a'], ['resource_boundary=memory', 'queue_b'], @@ -96,7 +94,7 @@ def self.name ['resource_boundary=cpu', 'queue_b'], ['tags=expensive', 'queue_c'] ] | 'queue_a' - # Match the same criteria multiple times, the first match wins + # Match the same rule multiple times, the first match wins [ ['feature_category=feature_a', 'queue_a'], ['feature_category=feature_a', 'queue_b'], @@ -112,9 +110,9 @@ def self.name # Match wildcard at the top of the chain. It makes the following rules useless [ ['*', 'queue_foo'], - ['feature_category=feature_b|urgency=high', 'queue_a'], - ['resource_boundary=memory', 'queue_b'], - ['tags=cheap', 'queue_c'] + ['feature_category=feature_a|urgency=low', 'queue_a'], + ['resource_boundary=cpu', 'queue_b'], + ['tags=expensive', 'queue_c'] ] | 'queue_foo' end end diff --git a/spec/workers/concerns/application_worker_spec.rb b/spec/workers/concerns/application_worker_spec.rb index ee569844febff2..5c1a1d3ae8f563 100644 --- a/spec/workers/concerns/application_worker_spec.rb +++ b/spec/workers/concerns/application_worker_spec.rb @@ -28,7 +28,7 @@ def self.name allow(router).to receive(:route).and_return('foo_bar_dummy') end - describe 'Sidekiq attribtues' do + describe 'Sidekiq attributes' do it 'sets the queue name based on the output of the router' do expect(worker.sidekiq_options['queue']).to eq('foo_bar_dummy') expect(router).to have_received(:route).with(worker).at_least(:once) @@ -152,7 +152,7 @@ def self.name allow(router).to receive(:route).and_return('foo_bar_dummy', 'some_namespace:foo_bar_dummy') end - it 'updates the quque name from the router again' do + it 'updates the queue name from the router again' do expect(worker.queue).to eq('foo_bar_dummy') worker.queue_namespace :some_namespace -- GitLab