diff --git a/Gemfile b/Gemfile index 458c7ca24..898d6802a 100644 --- a/Gemfile +++ b/Gemfile @@ -56,6 +56,7 @@ gem 'clockwork' gem 'puma-metrics' gem 'puma_worker_killer' gem "rack-timeout", require: "rack/timeout/base" +gem "parallel" group :production do gem 'unicorn', :platforms => :ruby diff --git a/Gemfile.lock b/Gemfile.lock index 7ba51b71a..bbc88a030 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -561,6 +561,7 @@ DEPENDENCIES newrelic_rpm nokogiri oauth2 + parallel pg pry-byebug pry-rails diff --git a/app/logical/bulk_update_request_processor.rb b/app/logical/bulk_update_request_processor.rb index d91f48c1e..6cf5c9078 100644 --- a/app/logical/bulk_update_request_processor.rb +++ b/app/logical/bulk_update_request_processor.rb @@ -137,45 +137,47 @@ class BulkUpdateRequestProcessor # Process the bulk update request immediately. def process! - commands.map do |command, *args| - case command - when :create_alias - TagAlias.approve!(antecedent_name: args[0], consequent_name: args[1], approver: approver, forum_topic: forum_topic) + CurrentUser.scoped(User.system, "127.0.0.1") do + commands.map do |command, *args| + case command + when :create_alias + TagAlias.approve!(antecedent_name: args[0], consequent_name: args[1], approver: approver, forum_topic: forum_topic) - when :create_implication - TagImplication.approve!(antecedent_name: args[0], consequent_name: args[1], approver: approver, forum_topic: forum_topic) + when :create_implication + TagImplication.approve!(antecedent_name: args[0], consequent_name: args[1], approver: approver, forum_topic: forum_topic) - when :remove_alias - tag_alias = TagAlias.active.find_by!(antecedent_name: args[0], consequent_name: args[1]) - tag_alias.reject!(User.system) + when :remove_alias + tag_alias = TagAlias.active.find_by!(antecedent_name: args[0], consequent_name: args[1]) + tag_alias.reject!(User.system) - when :remove_implication - tag_implication = TagImplication.active.find_by!(antecedent_name: args[0], consequent_name: args[1]) - tag_implication.reject!(User.system) + when :remove_implication + tag_implication = TagImplication.active.find_by!(antecedent_name: args[0], consequent_name: args[1]) + tag_implication.reject!(User.system) - when :mass_update - BulkUpdateRequestProcessor.mass_update(args[0], args[1]) + when :mass_update + BulkUpdateRequestProcessor.mass_update(args[0], args[1]) - when :nuke - BulkUpdateRequestProcessor.nuke(args[0]) + when :nuke + BulkUpdateRequestProcessor.nuke(args[0]) - when :rename - TagMover.new(args[0], args[1], user: User.system).move! + when :rename + TagMover.new(args[0], args[1], user: User.system).move! - when :change_category - tag = Tag.find_or_create_by_name(args[0]) - tag.update!(category: Tag.categories.value_for(args[1])) + when :change_category + tag = Tag.find_or_create_by_name(args[0]) + tag.update!(category: Tag.categories.value_for(args[1])) - else - # should never happen - raise Error, "Unknown command: #{command}" + else + # should never happen + raise Error, "Unknown command: #{command}" + end end - end - bulk_update_request.update!(status: "approved") - rescue StandardError - bulk_update_request.update!(status: "failed") - raise + bulk_update_request.update!(status: "approved") + rescue StandardError + bulk_update_request.update!(status: "failed") + raise + end end # The list of tags in the script. Used for search BURs by tag. @@ -255,7 +257,7 @@ class BulkUpdateRequestProcessor normalized_consequent = PostQueryBuilder.new(consequent).parse_tag_edit CurrentUser.scoped(user) do - Post.anon_tag_match(normalized_antecedent.join(" ")).find_each do |post| + Post.anon_tag_match(normalized_antecedent.join(" ")).reorder(nil).parallel_each do |post| post.with_lock do tags = (post.tag_array - normalized_antecedent + normalized_consequent).join(" ") post.update(tag_string: tags) diff --git a/app/logical/tag_mover.rb b/app/logical/tag_mover.rb index ae143e8e4..9930de237 100644 --- a/app/logical/tag_mover.rb +++ b/app/logical/tag_mover.rb @@ -69,7 +69,7 @@ class TagMover # Retag the posts from the old tag to the new tag. def move_posts! - Post.raw_tag_match(old_tag.name).find_each do |post| + Post.raw_tag_match(old_tag.name).reorder(nil).parallel_each do |post| post.lock! post.remove_tag(old_tag.name) post.add_tag(new_tag.name) diff --git a/app/models/application_record.rb b/app/models/application_record.rb index 07779931e..82850af8f 100644 --- a/app/models/application_record.rb +++ b/app/models/application_record.rb @@ -166,6 +166,33 @@ class ApplicationRecord < ActiveRecord::Base end end + concerning :ConcurrencyMethods do + class_methods do + def parallel_each(batch_size: 1000, in_processes: 4, in_threads: nil) + # XXX Use threads in testing because processes can't see each other's + # database transactions. + if Rails.env.test? + in_processes = nil + in_threads = 2 + end + + current_user = CurrentUser.user + current_ip = CurrentUser.ip_addr + + find_in_batches(batch_size: batch_size, error_on_ignore: true) do |batch| + Parallel.each(batch, in_processes: in_processes, in_threads: in_threads) do |record| + # XXX In threaded mode, the current user isn't inherited from the + # parent thread because the current user is a thread-local + # variable. Hence, we have to set it explicitly in the child thread. + CurrentUser.scoped(current_user, current_ip) do + yield record + end + end + end + end + end + end + def warnings @warnings ||= ActiveModel::Errors.new(self) end diff --git a/app/models/tag_implication.rb b/app/models/tag_implication.rb index 884094a91..d1062e927 100644 --- a/app/models/tag_implication.rb +++ b/app/models/tag_implication.rb @@ -146,7 +146,7 @@ class TagImplication < TagRelationship def update_posts! CurrentUser.scoped(User.system) do - Post.system_tag_match("#{antecedent_name} -#{consequent_name}").find_each do |post| + Post.system_tag_match("#{antecedent_name} -#{consequent_name}").reorder(nil).parallel_each do |post| post.lock! post.save! end diff --git a/test/unit/application_record.rb b/test/unit/application_record.rb index 543e43f3e..60355d482 100644 --- a/test/unit/application_record.rb +++ b/test/unit/application_record.rb @@ -21,4 +21,28 @@ class ApplicationRecordTest < ActiveSupport::TestCase assert_equal(@tags.reverse, Tag.search(updated_at: ">=#{@tags.first.updated_at}")) end end + + context "ApplicationRecord#parallel_each" do + context "in threaded mode" do + should "set CurrentUser correctly" do + @user1 = create(:user) + @user2 = create(:user) + + CurrentUser.scoped(@user1, "1.1.1.1") do + Tag.parallel_each do |tag| + assert_equal(@user1, CurrentUser.user) + assert_equal("1.1.1.1", CurrentUser.ip_addr) + + CurrentUser.scoped(@user2, "2.2.2.2") do + assert_equal(@user2, CurrentUser.user) + assert_equal("2.2.2.2", CurrentUser.ip_addr) + end + + assert_equal(@user1, CurrentUser.user) + assert_equal("1.1.1.1", CurrentUser.ip_addr) + end + end + end + end + end end diff --git a/test/unit/bulk_update_request_test.rb b/test/unit/bulk_update_request_test.rb index d5f9c0e3c..0930e0876 100644 --- a/test/unit/bulk_update_request_test.rb +++ b/test/unit/bulk_update_request_test.rb @@ -266,6 +266,7 @@ class BulkUpdateRequestTest < ActiveSupport::TestCase should "update the tags" do assert_equal("bar", @post.reload.tag_string) assert_equal("approved", @bur.reload.status) + assert_equal(User.system, @post.versions.last.updater) end should "be case-sensitive" do @@ -288,6 +289,7 @@ class BulkUpdateRequestTest < ActiveSupport::TestCase should "rename the tags" do assert_equal("bar blah", @post.reload.tag_string) assert_equal("approved", @bur.reload.status) + assert_equal(User.system, @post.versions.last.updater) end should "move the tag's artist entry and wiki page" do @@ -351,6 +353,7 @@ class BulkUpdateRequestTest < ActiveSupport::TestCase assert_equal("foo", @post.reload.tag_string) assert_equal("approved", @bur.reload.status) + assert_equal(User.system, @post.versions.last.updater) end should "remove implications" do @@ -370,6 +373,7 @@ class BulkUpdateRequestTest < ActiveSupport::TestCase assert_equal([], @pool.post_ids) assert_equal("approved", @bur.reload.status) + assert_equal(User.system, @pool.versions.last.updater) end end @@ -514,6 +518,10 @@ class BulkUpdateRequestTest < ActiveSupport::TestCase assert_equal("approved", @bur.reload.status) end + should "update the post as DanbooruBot" do + assert_equal(User.system, @post.versions.last.updater) + end + should "set the BUR as failed if there is an unexpected error during processing" do @bur = create(:bulk_update_request, script: "alias one -> two") TagAlias.any_instance.stubs(:process!).raises(RuntimeError.new("oh no")) diff --git a/test/unit/tag_alias_test.rb b/test/unit/tag_alias_test.rb index 8f614be00..e4b369231 100644 --- a/test/unit/tag_alias_test.rb +++ b/test/unit/tag_alias_test.rb @@ -142,6 +142,8 @@ class TagAliasTest < ActiveSupport::TestCase assert_equal("bbb ccc", post1.reload.tag_string) assert_equal("ccc ddd", post2.reload.tag_string) + assert_equal(User.system, post1.versions.last.updater) + assert_equal(CurrentUser.user, post2.versions.last.updater) end should "not validate for transitive relations" do diff --git a/test/unit/tag_implication_test.rb b/test/unit/tag_implication_test.rb index f0f8d3625..2b33b6207 100644 --- a/test/unit/tag_implication_test.rb +++ b/test/unit/tag_implication_test.rb @@ -124,6 +124,8 @@ class TagImplicationTest < ActiveSupport::TestCase assert_equal("sword weapon", p1.reload.tag_string) assert_equal("sword weapon", p2.reload.tag_string) + assert_equal(User.system, p1.versions.last.updater) + assert_equal(CurrentUser.user, p2.versions.last.updater) end context "when calculating implied tags" do