From d854bf6b53349f8e6f782e1ade8fd6c946b06621 Mon Sep 17 00:00:00 2001 From: evazion Date: Sun, 19 Sep 2021 22:30:54 -0500 Subject: [PATCH] BURs: update posts in parallel. When processing an alias, rename, implication, mass update, or nuke, update the posts in parallel. This means that if we alias foo to bar, for example, then we use four processes at once to retag the posts from foo to bar. This doesn't mean that if we have two aliases in a BUR, we process both aliases in parallel. It simply means that when processing an alias, we update the posts in parallel for that alias. --- Gemfile | 1 + Gemfile.lock | 1 + app/logical/bulk_update_request_processor.rb | 62 ++++++++++---------- app/logical/tag_mover.rb | 2 +- app/models/application_record.rb | 27 +++++++++ app/models/tag_implication.rb | 2 +- test/unit/application_record.rb | 24 ++++++++ test/unit/bulk_update_request_test.rb | 8 +++ test/unit/tag_alias_test.rb | 2 + test/unit/tag_implication_test.rb | 2 + 10 files changed, 99 insertions(+), 32 deletions(-) diff --git a/Gemfile b/Gemfile index 458c7ca24..898d6802a 100644 --- a/Gemfile +++ b/Gemfile @@ -56,6 +56,7 @@ gem 'clockwork' gem 'puma-metrics' gem 'puma_worker_killer' gem "rack-timeout", require: "rack/timeout/base" +gem "parallel" group :production do gem 'unicorn', :platforms => :ruby diff --git a/Gemfile.lock b/Gemfile.lock index 7ba51b71a..bbc88a030 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -561,6 +561,7 @@ DEPENDENCIES newrelic_rpm nokogiri oauth2 + parallel pg pry-byebug pry-rails diff --git a/app/logical/bulk_update_request_processor.rb b/app/logical/bulk_update_request_processor.rb index d91f48c1e..6cf5c9078 100644 --- a/app/logical/bulk_update_request_processor.rb +++ b/app/logical/bulk_update_request_processor.rb @@ -137,45 +137,47 @@ class BulkUpdateRequestProcessor # Process the bulk update request immediately. def process! - commands.map do |command, *args| - case command - when :create_alias - TagAlias.approve!(antecedent_name: args[0], consequent_name: args[1], approver: approver, forum_topic: forum_topic) + CurrentUser.scoped(User.system, "127.0.0.1") do + commands.map do |command, *args| + case command + when :create_alias + TagAlias.approve!(antecedent_name: args[0], consequent_name: args[1], approver: approver, forum_topic: forum_topic) - when :create_implication - TagImplication.approve!(antecedent_name: args[0], consequent_name: args[1], approver: approver, forum_topic: forum_topic) + when :create_implication + TagImplication.approve!(antecedent_name: args[0], consequent_name: args[1], approver: approver, forum_topic: forum_topic) - when :remove_alias - tag_alias = TagAlias.active.find_by!(antecedent_name: args[0], consequent_name: args[1]) - tag_alias.reject!(User.system) + when :remove_alias + tag_alias = TagAlias.active.find_by!(antecedent_name: args[0], consequent_name: args[1]) + tag_alias.reject!(User.system) - when :remove_implication - tag_implication = TagImplication.active.find_by!(antecedent_name: args[0], consequent_name: args[1]) - tag_implication.reject!(User.system) + when :remove_implication + tag_implication = TagImplication.active.find_by!(antecedent_name: args[0], consequent_name: args[1]) + tag_implication.reject!(User.system) - when :mass_update - BulkUpdateRequestProcessor.mass_update(args[0], args[1]) + when :mass_update + BulkUpdateRequestProcessor.mass_update(args[0], args[1]) - when :nuke - BulkUpdateRequestProcessor.nuke(args[0]) + when :nuke + BulkUpdateRequestProcessor.nuke(args[0]) - when :rename - TagMover.new(args[0], args[1], user: User.system).move! + when :rename + TagMover.new(args[0], args[1], user: User.system).move! - when :change_category - tag = Tag.find_or_create_by_name(args[0]) - tag.update!(category: Tag.categories.value_for(args[1])) + when :change_category + tag = Tag.find_or_create_by_name(args[0]) + tag.update!(category: Tag.categories.value_for(args[1])) - else - # should never happen - raise Error, "Unknown command: #{command}" + else + # should never happen + raise Error, "Unknown command: #{command}" + end end - end - bulk_update_request.update!(status: "approved") - rescue StandardError - bulk_update_request.update!(status: "failed") - raise + bulk_update_request.update!(status: "approved") + rescue StandardError + bulk_update_request.update!(status: "failed") + raise + end end # The list of tags in the script. Used for search BURs by tag. @@ -255,7 +257,7 @@ class BulkUpdateRequestProcessor normalized_consequent = PostQueryBuilder.new(consequent).parse_tag_edit CurrentUser.scoped(user) do - Post.anon_tag_match(normalized_antecedent.join(" ")).find_each do |post| + Post.anon_tag_match(normalized_antecedent.join(" ")).reorder(nil).parallel_each do |post| post.with_lock do tags = (post.tag_array - normalized_antecedent + normalized_consequent).join(" ") post.update(tag_string: tags) diff --git a/app/logical/tag_mover.rb b/app/logical/tag_mover.rb index ae143e8e4..9930de237 100644 --- a/app/logical/tag_mover.rb +++ b/app/logical/tag_mover.rb @@ -69,7 +69,7 @@ class TagMover # Retag the posts from the old tag to the new tag. def move_posts! - Post.raw_tag_match(old_tag.name).find_each do |post| + Post.raw_tag_match(old_tag.name).reorder(nil).parallel_each do |post| post.lock! post.remove_tag(old_tag.name) post.add_tag(new_tag.name) diff --git a/app/models/application_record.rb b/app/models/application_record.rb index 07779931e..82850af8f 100644 --- a/app/models/application_record.rb +++ b/app/models/application_record.rb @@ -166,6 +166,33 @@ class ApplicationRecord < ActiveRecord::Base end end + concerning :ConcurrencyMethods do + class_methods do + def parallel_each(batch_size: 1000, in_processes: 4, in_threads: nil) + # XXX Use threads in testing because processes can't see each other's + # database transactions. + if Rails.env.test? + in_processes = nil + in_threads = 2 + end + + current_user = CurrentUser.user + current_ip = CurrentUser.ip_addr + + find_in_batches(batch_size: batch_size, error_on_ignore: true) do |batch| + Parallel.each(batch, in_processes: in_processes, in_threads: in_threads) do |record| + # XXX In threaded mode, the current user isn't inherited from the + # parent thread because the current user is a thread-local + # variable. Hence, we have to set it explicitly in the child thread. + CurrentUser.scoped(current_user, current_ip) do + yield record + end + end + end + end + end + end + def warnings @warnings ||= ActiveModel::Errors.new(self) end diff --git a/app/models/tag_implication.rb b/app/models/tag_implication.rb index 884094a91..d1062e927 100644 --- a/app/models/tag_implication.rb +++ b/app/models/tag_implication.rb @@ -146,7 +146,7 @@ class TagImplication < TagRelationship def update_posts! CurrentUser.scoped(User.system) do - Post.system_tag_match("#{antecedent_name} -#{consequent_name}").find_each do |post| + Post.system_tag_match("#{antecedent_name} -#{consequent_name}").reorder(nil).parallel_each do |post| post.lock! post.save! end diff --git a/test/unit/application_record.rb b/test/unit/application_record.rb index 543e43f3e..60355d482 100644 --- a/test/unit/application_record.rb +++ b/test/unit/application_record.rb @@ -21,4 +21,28 @@ class ApplicationRecordTest < ActiveSupport::TestCase assert_equal(@tags.reverse, Tag.search(updated_at: ">=#{@tags.first.updated_at}")) end end + + context "ApplicationRecord#parallel_each" do + context "in threaded mode" do + should "set CurrentUser correctly" do + @user1 = create(:user) + @user2 = create(:user) + + CurrentUser.scoped(@user1, "1.1.1.1") do + Tag.parallel_each do |tag| + assert_equal(@user1, CurrentUser.user) + assert_equal("1.1.1.1", CurrentUser.ip_addr) + + CurrentUser.scoped(@user2, "2.2.2.2") do + assert_equal(@user2, CurrentUser.user) + assert_equal("2.2.2.2", CurrentUser.ip_addr) + end + + assert_equal(@user1, CurrentUser.user) + assert_equal("1.1.1.1", CurrentUser.ip_addr) + end + end + end + end + end end diff --git a/test/unit/bulk_update_request_test.rb b/test/unit/bulk_update_request_test.rb index d5f9c0e3c..0930e0876 100644 --- a/test/unit/bulk_update_request_test.rb +++ b/test/unit/bulk_update_request_test.rb @@ -266,6 +266,7 @@ class BulkUpdateRequestTest < ActiveSupport::TestCase should "update the tags" do assert_equal("bar", @post.reload.tag_string) assert_equal("approved", @bur.reload.status) + assert_equal(User.system, @post.versions.last.updater) end should "be case-sensitive" do @@ -288,6 +289,7 @@ class BulkUpdateRequestTest < ActiveSupport::TestCase should "rename the tags" do assert_equal("bar blah", @post.reload.tag_string) assert_equal("approved", @bur.reload.status) + assert_equal(User.system, @post.versions.last.updater) end should "move the tag's artist entry and wiki page" do @@ -351,6 +353,7 @@ class BulkUpdateRequestTest < ActiveSupport::TestCase assert_equal("foo", @post.reload.tag_string) assert_equal("approved", @bur.reload.status) + assert_equal(User.system, @post.versions.last.updater) end should "remove implications" do @@ -370,6 +373,7 @@ class BulkUpdateRequestTest < ActiveSupport::TestCase assert_equal([], @pool.post_ids) assert_equal("approved", @bur.reload.status) + assert_equal(User.system, @pool.versions.last.updater) end end @@ -514,6 +518,10 @@ class BulkUpdateRequestTest < ActiveSupport::TestCase assert_equal("approved", @bur.reload.status) end + should "update the post as DanbooruBot" do + assert_equal(User.system, @post.versions.last.updater) + end + should "set the BUR as failed if there is an unexpected error during processing" do @bur = create(:bulk_update_request, script: "alias one -> two") TagAlias.any_instance.stubs(:process!).raises(RuntimeError.new("oh no")) diff --git a/test/unit/tag_alias_test.rb b/test/unit/tag_alias_test.rb index 8f614be00..e4b369231 100644 --- a/test/unit/tag_alias_test.rb +++ b/test/unit/tag_alias_test.rb @@ -142,6 +142,8 @@ class TagAliasTest < ActiveSupport::TestCase assert_equal("bbb ccc", post1.reload.tag_string) assert_equal("ccc ddd", post2.reload.tag_string) + assert_equal(User.system, post1.versions.last.updater) + assert_equal(CurrentUser.user, post2.versions.last.updater) end should "not validate for transitive relations" do diff --git a/test/unit/tag_implication_test.rb b/test/unit/tag_implication_test.rb index f0f8d3625..2b33b6207 100644 --- a/test/unit/tag_implication_test.rb +++ b/test/unit/tag_implication_test.rb @@ -124,6 +124,8 @@ class TagImplicationTest < ActiveSupport::TestCase assert_equal("sword weapon", p1.reload.tag_string) assert_equal("sword weapon", p2.reload.tag_string) + assert_equal(User.system, p1.versions.last.updater) + assert_equal(CurrentUser.user, p2.versions.last.updater) end context "when calculating implied tags" do