diff --git a/app/services/activity_pub/accept_follow_service.rb b/app/services/activity_pub/accept_follow_service.rb new file mode 100644 index 0000000000000000000000000000000000000000..0ec440fa97266858efa3b32977bb5c2956f355fb --- /dev/null +++ b/app/services/activity_pub/accept_follow_service.rb @@ -0,0 +1,55 @@ +# frozen_string_literal: true + +module ActivityPub + class AcceptFollowService + MissingInboxURLError = Class.new(StandardError) + + attr_reader :subscription, :actor + + def initialize(subscription, actor) + @subscription = subscription + @actor = actor + end + + def execute + return if subscription.accepted? + raise MissingInboxURLError unless subscription.subscriber_inbox_url.present? + + upload_accept_activity + subscription.accepted! + end + + private + + def upload_accept_activity + body = Gitlab::Json::LimitedEncoder.encode(payload, limit: 1.megabyte) + + begin + Gitlab::HTTP.post(subscription.subscriber_inbox_url, body: body, headers: headers) + rescue StandardError => e + raise ThirdPartyError, e.message + end + end + + def payload + follow = subscription.payload.dup + follow.delete('@context') + + { + '@context': 'https://www.w3.org/ns/activitystreams', + id: "#{actor}#follow/#{subscription.id}/accept", + type: 'Accept', + actor: actor, + object: follow + } + end + + def headers + { + 'User-Agent' => "GitLab/#{Gitlab::VERSION}", + 'Content-Type' => 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"', + 'Accept' => 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"' + } + end + end +end diff --git a/app/services/activity_pub/inbox_resolver_service.rb b/app/services/activity_pub/inbox_resolver_service.rb new file mode 100644 index 0000000000000000000000000000000000000000..c2bd2112b16468d960cec9e5a3ac4792292d1643 --- /dev/null +++ b/app/services/activity_pub/inbox_resolver_service.rb @@ -0,0 +1,50 @@ +# frozen_string_literal: true + +module ActivityPub + class InboxResolverService + attr_reader :subscription + + def initialize(subscription) + @subscription = subscription + end + + def execute + profile = subscriber_profile + unless profile.has_key?('inbox') && profile['inbox'].is_a?(String) + raise ThirdPartyError, 'Inbox parameter absent or invalid' + end + + subscription.subscriber_inbox_url = profile['inbox'] + subscription.shared_inbox_url = profile.dig('entrypoints', 'sharedInbox') + subscription.save! + end + + private + + def subscriber_profile + raw_data = download_subscriber_profile + + begin + profile = Gitlab::Json.parse(raw_data) + rescue JSON::ParserError => e + raise ThirdPartyError, e.message + end + + profile + end + + def download_subscriber_profile + begin + response = Gitlab::HTTP.get(subscription.subscriber_url, + headers: { + 'Accept' => 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"' + } + ) + rescue StandardError => e + raise ThirdPartyError, e.message + end + + response.body + end + end +end diff --git a/app/services/activity_pub/third_party_error.rb b/app/services/activity_pub/third_party_error.rb new file mode 100644 index 0000000000000000000000000000000000000000..473a67984a45c0de19892b96eccca3d1882fa775 --- /dev/null +++ b/app/services/activity_pub/third_party_error.rb @@ -0,0 +1,5 @@ +# frozen_string_literal: true + +module ActivityPub + ThirdPartyError = Class.new(StandardError) +end diff --git a/app/workers/activity_pub/projects/releases_subscription_worker.rb b/app/workers/activity_pub/projects/releases_subscription_worker.rb new file mode 100644 index 0000000000000000000000000000000000000000..c392726a4697c4a27b238287c31094920e236e61 --- /dev/null +++ b/app/workers/activity_pub/projects/releases_subscription_worker.rb @@ -0,0 +1,39 @@ +# frozen_string_literal: true + +module ActivityPub + module Projects + class ReleasesSubscriptionWorker + include ApplicationWorker + include Gitlab::Routing.url_helpers + + idempotent! + worker_has_external_dependencies! + feature_category :release_orchestration + data_consistency :delayed + queue_namespace :activity_pub + + sidekiq_retries_exhausted do |msg, _ex| + subscription_id = msg['args'].second + subscription = ActivityPub::ReleasesSubscription.find_by_id(subscription_id) + subscription&.destroy + end + + def perform(subscription_id) + subscription = ActivityPub::ReleasesSubscription.find_by_id(subscription_id) + return if subscription.nil? + + unless subscription.project.public? + subscription.destroy + return + end + + InboxResolverService.new(subscription).execute if needs_resolving?(subscription) + AcceptFollowService.new(subscription, project_releases_url(subscription.project)).execute + end + + def needs_resolving?(subscription) + subscription.subscriber_inbox_url.blank? || subscription.shared_inbox_url.blank? + end + end + end +end diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml index e5b860ba5255e0fe7cc39386c8f2ce0f8e505a4d..c272a3abcd51e0170ecb6b1181bd9d0591e7e099 100644 --- a/app/workers/all_queues.yml +++ b/app/workers/all_queues.yml @@ -3,6 +3,15 @@ # # Do not edit it manually! --- +- :name: activity_pub:activity_pub_projects_releases_subscription + :worker_name: ActivityPub::Projects::ReleasesSubscriptionWorker + :feature_category: :release_orchestration + :has_external_dependencies: true + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: authorized_project_update:authorized_project_update_project_recalculate :worker_name: AuthorizedProjectUpdate::ProjectRecalculateWorker :feature_category: :system_access diff --git a/config/sidekiq_queues.yml b/config/sidekiq_queues.yml index 1f0b4840a8e443f6f80132b9925dc0d0e44897e9..bf0f6b4d3fcf0a5792131e3f4e533c9295e6e5bd 100644 --- a/config/sidekiq_queues.yml +++ b/config/sidekiq_queues.yml @@ -25,6 +25,8 @@ :queues: - - abuse_new_abuse_report - 1 +- - activity_pub + - 1 - - adjourned_project_deletion - 1 - - admin_emails diff --git a/spec/services/activity_pub/accept_follow_service_spec.rb b/spec/services/activity_pub/accept_follow_service_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..0f472412085f4bb8170f88a144237bfc4cc6fc47 --- /dev/null +++ b/spec/services/activity_pub/accept_follow_service_spec.rb @@ -0,0 +1,77 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe ActivityPub::AcceptFollowService, feature_category: :integrations do + let_it_be(:project) { create(:project, :public) } + let_it_be_with_reload(:existing_subscription) do + create(:activity_pub_releases_subscription, :inbox, project: project) + end + + let(:service) { described_class.new(existing_subscription, 'http://localhost/my-project/releases') } + + describe '#execute' do + context 'when third party server complies' do + before do + allow(Gitlab::HTTP).to receive(:post).and_return(true) + service.execute + end + + it 'sends an Accept activity' do + expect(Gitlab::HTTP).to have_received(:post) + end + + it 'updates subscription state to accepted' do + expect(existing_subscription.reload.status).to eq 'accepted' + end + end + + context 'when there is an error with third party server' do + before do + allow(Gitlab::HTTP).to receive(:post).and_raise(Errno::ECONNREFUSED) + end + + it 'raises a ThirdPartyError' do + expect { service.execute }.to raise_error(ActivityPub::ThirdPartyError) + end + + it 'does not update subscription state to accepted' do + begin + service.execute + rescue StandardError + end + + expect(existing_subscription.reload.status).to eq 'requested' + end + end + + context 'when subscription is already accepted' do + before do + allow(Gitlab::HTTP).to receive(:post).and_return(true) + allow(existing_subscription).to receive(:accepted!).and_return(true) + existing_subscription.status = :accepted + service.execute + end + + it 'does not send an Accept activity' do + expect(Gitlab::HTTP).not_to have_received(:post) + end + + it 'does not update subscription state' do + expect(existing_subscription).not_to have_received(:accepted!) + end + end + + context 'when inbox has not been resolved' do + before do + allow(Gitlab::HTTP).to receive(:post).and_return(true) + allow(existing_subscription).to receive(:accepted!).and_return(true) + end + + it 'raises an error' do + existing_subscription.subscriber_inbox_url = nil + expect { service.execute }.to raise_error(ActivityPub::AcceptFollowService::MissingInboxURLError) + end + end + end +end diff --git a/spec/services/activity_pub/inbox_resolver_service_spec.rb b/spec/services/activity_pub/inbox_resolver_service_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..29048045bb5d0d7c52cb821f3993c6e4da53f1c0 --- /dev/null +++ b/spec/services/activity_pub/inbox_resolver_service_spec.rb @@ -0,0 +1,99 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe ActivityPub::InboxResolverService, feature_category: :integrations do + let_it_be(:project) { create(:project, :public) } + let_it_be_with_reload(:existing_subscription) { create(:activity_pub_releases_subscription, project: project) } + let(:service) { described_class.new(existing_subscription) } + + shared_examples 'third party error' do + it 'raises a ThirdPartyError' do + expect { service.execute }.to raise_error(ActivityPub::ThirdPartyError) + end + + it 'does not update the subscription record' do + begin + service.execute + rescue StandardError + end + + expect(ActivityPub::ReleasesSubscription.last.subscriber_inbox_url).not_to eq 'https://example.com/user/inbox' + end + end + + describe '#execute' do + context 'with successful HTTP request' do + before do + allow(Gitlab::HTTP).to receive(:get) { response } + end + + let(:response) { instance_double(HTTParty::Response, body: body) } + + context 'with a JSON response' do + let(:body) do + { + '@context': 'https://www.w3.org/ns/activitystreams', + id: 'https://example.com/user', + type: 'Person', + **inbox, + **entrypoints, + outbox: 'https://example.com/user/outbox' + }.to_json + end + + let(:entrypoints) { {} } + + context 'with valid response' do + let(:inbox) { { inbox: 'https://example.com/user/inbox' } } + + context 'without a shared inbox' do + it 'updates only the inbox in the subscription record' do + service.execute + + expect(ActivityPub::ReleasesSubscription.last.subscriber_inbox_url).to eq 'https://example.com/user/inbox' + expect(ActivityPub::ReleasesSubscription.last.shared_inbox_url).to be_nil + end + end + + context 'with a shared inbox' do + let(:entrypoints) { { entrypoints: { sharedInbox: 'https://example.com/shared-inbox' } } } + + it 'updates both the inbox and shared inbox in the subscription record' do + service.execute + + expect(ActivityPub::ReleasesSubscription.last.subscriber_inbox_url).to eq 'https://example.com/user/inbox' + expect(ActivityPub::ReleasesSubscription.last.shared_inbox_url).to eq 'https://example.com/shared-inbox' + end + end + end + + context 'without inbox attribute' do + let(:inbox) { {} } + + it_behaves_like 'third party error' + end + + context 'with a non string inbox attribute' do + let(:inbox) { { inbox: 27.13 } } + + it_behaves_like 'third party error' + end + end + + context 'with non JSON response' do + let(:body) { '
woops
' } + + it_behaves_like 'third party error' + end + end + + context 'with http error' do + before do + allow(Gitlab::HTTP).to receive(:get).and_raise(Errno::ECONNREFUSED) + end + + it_behaves_like 'third party error' + end + end +end diff --git a/spec/workers/activity_pub/projects/releases_subscription_worker_spec.rb b/spec/workers/activity_pub/projects/releases_subscription_worker_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..c41c1bb8e1c9ee0d4c94ba00662cd18503018a13 --- /dev/null +++ b/spec/workers/activity_pub/projects/releases_subscription_worker_spec.rb @@ -0,0 +1,128 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe ActivityPub::Projects::ReleasesSubscriptionWorker, feature_category: :release_orchestration do + describe '#perform' do + let(:worker) { described_class.new } + let(:project) { build_stubbed :project, :public } + let(:subscription) { build_stubbed :activity_pub_releases_subscription, project: project } + let(:inbox_resolver_service) { instance_double('ActivityPub::InboxResolverService', execute: true) } + let(:accept_follow_service) { instance_double('ActivityPub::AcceptFollowService', execute: true) } + + before do + allow(ActivityPub::ReleasesSubscription).to receive(:find_by_id) { subscription } + allow(subscription).to receive(:destroy).and_return(true) + allow(ActivityPub::InboxResolverService).to receive(:new) { inbox_resolver_service } + allow(ActivityPub::AcceptFollowService).to receive(:new) { accept_follow_service } + end + + context 'when the project is public' do + before do + worker.perform(subscription.id) + end + + context 'when inbox url has not been resolved yet' do + it 'calls the service to resolve the inbox url' do + expect(inbox_resolver_service).to have_received(:execute) + end + + it 'calls the service to send out the Accept activity' do + expect(accept_follow_service).to have_received(:execute) + end + end + + context 'when inbox url has been resolved' do + context 'when shared inbox url has not been resolved' do + let(:subscription) { build_stubbed :activity_pub_releases_subscription, :inbox, project: project } + + it 'calls the service to resolve the inbox url' do + expect(inbox_resolver_service).to have_received(:execute) + end + + it 'calls the service to send out the Accept activity' do + expect(accept_follow_service).to have_received(:execute) + end + end + + context 'when shared inbox url has been resolved' do + let(:subscription) do + build_stubbed :activity_pub_releases_subscription, :inbox, :shared_inbox, project: project + end + + it 'does not call the service to resolve the inbox url' do + expect(inbox_resolver_service).not_to have_received(:execute) + end + + it 'calls the service to send out the Accept activity' do + expect(accept_follow_service).to have_received(:execute) + end + end + end + end + + shared_examples 'failed job' do + it 'does not resolve inbox url' do + expect(inbox_resolver_service).not_to have_received(:execute) + end + + it 'does not send out Accept activity' do + expect(accept_follow_service).not_to have_received(:execute) + end + end + + context 'when the subscription does not exist' do + before do + allow(ActivityPub::ReleasesSubscription).to receive(:find_by_id).and_return(nil) + worker.perform(subscription.id) + end + + it_behaves_like 'failed job' + end + + shared_examples 'non public project' do + it_behaves_like 'failed job' + + it 'deletes the subscription' do + expect(subscription).to have_received(:destroy) + end + end + + context 'when project has changed to internal' do + before do + worker.perform(subscription.id) + end + + let(:project) { build_stubbed :project, :internal } + + it_behaves_like 'non public project' + end + + context 'when project has changed to private' do + before do + worker.perform(subscription.id) + end + + let(:project) { build_stubbed :project, :private } + + it_behaves_like 'non public project' + end + end + + describe '#sidekiq_retries_exhausted' do + let(:project) { build_stubbed :project, :public } + let(:subscription) { build_stubbed :activity_pub_releases_subscription, project: project } + let(:job) { { 'args' => [project.id, subscription.id], 'error_message' => 'Error' } } + + before do + allow(Project).to receive(:find) { project } + allow(ActivityPub::ReleasesSubscription).to receive(:find_by_id) { subscription } + end + + it 'delete the subscription' do + expect(subscription).to receive(:destroy) + + described_class.sidekiq_retries_exhausted_block.call(job, StandardError.new) + end + end +end