diff --git a/app/services/fan_out_on_write_service.rb b/app/services/fan_out_on_write_service.rb index 2554756a5..f2a79c9fc 100644 --- a/app/services/fan_out_on_write_service.rb +++ b/app/services/fan_out_on_write_service.rb @@ -8,6 +8,7 @@ class FanOutOnWriteService < BaseService # @param [Hash] options # @option options [Boolean] update # @option options [Array] silenced_account_ids + # @option options [Boolean] skip_notifications def call(status, options = {}) @status = status @account = status.account @@ -37,8 +38,11 @@ class FanOutOnWriteService < BaseService def fan_out_to_local_recipients! deliver_to_self! - notify_mentioned_accounts! - notify_about_update! if update? + + unless @options[:skip_notifications] + notify_mentioned_accounts! + notify_about_update! if update? + end case @status.visibility.to_sym when :public, :unlisted, :private diff --git a/app/workers/thread_resolve_worker.rb b/app/workers/thread_resolve_worker.rb index 3206c45f6..d4cefb3fd 100644 --- a/app/workers/thread_resolve_worker.rb +++ b/app/workers/thread_resolve_worker.rb @@ -7,13 +7,18 @@ class ThreadResolveWorker sidekiq_options queue: 'pull', retry: 3 def perform(child_status_id, parent_url, options = {}) - child_status = Status.find(child_status_id) - parent_status = FetchRemoteStatusService.new.call(parent_url, **options.deep_symbolize_keys) + child_status = Status.find(child_status_id) + return if child_status.in_reply_to_id.present? + + parent_status = ActivityPub::TagManager.instance.uri_to_resource(parent_url, Status) + parent_status ||= FetchRemoteStatusService.new.call(parent_url, **options.deep_symbolize_keys) return if parent_status.nil? child_status.thread = parent_status child_status.save! + + DistributionWorker.perform_async(child_status_id, { 'skip_notifications' => true }) if child_status.within_realtime_window? rescue ActiveRecord::RecordNotFound true end diff --git a/spec/lib/activitypub/activity/create_spec.rb b/spec/lib/activitypub/activity/create_spec.rb index f6c24754c..8425f2127 100644 --- a/spec/lib/activitypub/activity/create_spec.rb +++ b/spec/lib/activitypub/activity/create_spec.rb @@ -23,6 +23,109 @@ RSpec.describe ActivityPub::Activity::Create do stub_request(:get, 'http://example.com/emojib.png').to_return(body: attachment_fixture('emojo.png'), headers: { 'Content-Type' => 'application/octet-stream' }) end + describe 'processing posts received out of order' do + let(:follower) { Fabricate(:account, username: 'bob') } + + let(:object_json) do + { + id: [ActivityPub::TagManager.instance.uri_for(sender), 'post1'].join('/'), + type: 'Note', + to: [ + 'https://www.w3.org/ns/activitystreams#Public', + ActivityPub::TagManager.instance.uri_for(follower), + ], + content: '@bob lorem ipsum', + published: 1.hour.ago.utc.iso8601, + updated: 1.hour.ago.utc.iso8601, + tag: { + type: 'Mention', + href: ActivityPub::TagManager.instance.uri_for(follower), + }, + } + end + + let(:reply_json) do + { + id: [ActivityPub::TagManager.instance.uri_for(sender), 'reply'].join('/'), + type: 'Note', + inReplyTo: object_json[:id], + to: [ + 'https://www.w3.org/ns/activitystreams#Public', + ActivityPub::TagManager.instance.uri_for(follower), + ], + content: '@bob lorem ipsum', + published: Time.now.utc.iso8601, + updated: Time.now.utc.iso8601, + tag: { + type: 'Mention', + href: ActivityPub::TagManager.instance.uri_for(follower), + }, + } + end + + def activity_for_object(json) + { + '@context': 'https://www.w3.org/ns/activitystreams', + id: [json[:id], 'activity'].join('/'), + type: 'Create', + actor: ActivityPub::TagManager.instance.uri_for(sender), + object: json, + }.with_indifferent_access + end + + before do + follower.follow!(sender) + end + + around do |example| + Sidekiq::Testing.fake! do + example.run + Sidekiq::Worker.clear_all + end + end + + it 'correctly processes posts and inserts them in timelines', :aggregate_failures do + # Simulate a temporary failure preventing from fetching the parent post + stub_request(:get, object_json[:id]).to_return(status: 500) + + # When receiving the reply… + described_class.new(activity_for_object(reply_json), sender, delivery: true).perform + + # NOTE: Refering explicitly to the workers is a bit awkward + DistributionWorker.drain + FeedInsertWorker.drain + + # …it creates a status with an unknown parent + reply = Status.find_by(uri: reply_json[:id]) + expect(reply.reply?).to be true + expect(reply.in_reply_to_id).to be_nil + + # …and creates a notification + expect(LocalNotificationWorker.jobs.size).to eq 1 + + # …but does not insert it into timelines + expect(redis.zscore(FeedManager.instance.key(:home, follower.id), reply.id)).to be_nil + + # When receiving the parent… + described_class.new(activity_for_object(object_json), sender, delivery: true).perform + + Sidekiq::Worker.drain_all + + # …it creates a status and insert it into timelines + parent = Status.find_by(uri: object_json[:id]) + expect(parent.reply?).to be false + expect(parent.in_reply_to_id).to be_nil + expect(reply.reload.in_reply_to_id).to eq parent.id + + # Check that the both statuses have been inserted into the home feed + expect(redis.zscore(FeedManager.instance.key(:home, follower.id), parent.id)).to be_within(0.1).of(parent.id.to_f) + expect(redis.zscore(FeedManager.instance.key(:home, follower.id), reply.id)).to be_within(0.1).of(reply.id.to_f) + + # Creates two notifications + expect(Notification.count).to eq 2 + end + end + describe '#perform' do context 'when fetching' do subject { described_class.new(json, sender) }