BURs: update posts in parallel.

When processing an alias, rename, implication, mass update, or nuke,
update the posts in parallel. This means that if we alias foo to bar,
for example, then we use four processes at once to retag the posts from
foo to bar.

This doesn't mean that if we have two aliases in a BUR, we process both
aliases in parallel. It simply means that when processing an alias, we
update the posts in parallel for that alias.
This commit is contained in:
evazion
2021-09-19 22:30:54 -05:00
parent 21f0c2acc3
commit d854bf6b53
10 changed files with 99 additions and 32 deletions

View File

@@ -56,6 +56,7 @@ gem 'clockwork'
gem 'puma-metrics'
gem 'puma_worker_killer'
gem "rack-timeout", require: "rack/timeout/base"
gem "parallel"
group :production do
gem 'unicorn', :platforms => :ruby

View File

@@ -561,6 +561,7 @@ DEPENDENCIES
newrelic_rpm
nokogiri
oauth2
parallel
pg
pry-byebug
pry-rails

View File

@@ -137,45 +137,47 @@ class BulkUpdateRequestProcessor
# Process the bulk update request immediately.
def process!
commands.map do |command, *args|
case command
when :create_alias
TagAlias.approve!(antecedent_name: args[0], consequent_name: args[1], approver: approver, forum_topic: forum_topic)
CurrentUser.scoped(User.system, "127.0.0.1") do
commands.map do |command, *args|
case command
when :create_alias
TagAlias.approve!(antecedent_name: args[0], consequent_name: args[1], approver: approver, forum_topic: forum_topic)
when :create_implication
TagImplication.approve!(antecedent_name: args[0], consequent_name: args[1], approver: approver, forum_topic: forum_topic)
when :create_implication
TagImplication.approve!(antecedent_name: args[0], consequent_name: args[1], approver: approver, forum_topic: forum_topic)
when :remove_alias
tag_alias = TagAlias.active.find_by!(antecedent_name: args[0], consequent_name: args[1])
tag_alias.reject!(User.system)
when :remove_alias
tag_alias = TagAlias.active.find_by!(antecedent_name: args[0], consequent_name: args[1])
tag_alias.reject!(User.system)
when :remove_implication
tag_implication = TagImplication.active.find_by!(antecedent_name: args[0], consequent_name: args[1])
tag_implication.reject!(User.system)
when :remove_implication
tag_implication = TagImplication.active.find_by!(antecedent_name: args[0], consequent_name: args[1])
tag_implication.reject!(User.system)
when :mass_update
BulkUpdateRequestProcessor.mass_update(args[0], args[1])
when :mass_update
BulkUpdateRequestProcessor.mass_update(args[0], args[1])
when :nuke
BulkUpdateRequestProcessor.nuke(args[0])
when :nuke
BulkUpdateRequestProcessor.nuke(args[0])
when :rename
TagMover.new(args[0], args[1], user: User.system).move!
when :rename
TagMover.new(args[0], args[1], user: User.system).move!
when :change_category
tag = Tag.find_or_create_by_name(args[0])
tag.update!(category: Tag.categories.value_for(args[1]))
when :change_category
tag = Tag.find_or_create_by_name(args[0])
tag.update!(category: Tag.categories.value_for(args[1]))
else
# should never happen
raise Error, "Unknown command: #{command}"
else
# should never happen
raise Error, "Unknown command: #{command}"
end
end
end
bulk_update_request.update!(status: "approved")
rescue StandardError
bulk_update_request.update!(status: "failed")
raise
bulk_update_request.update!(status: "approved")
rescue StandardError
bulk_update_request.update!(status: "failed")
raise
end
end
# The list of tags in the script. Used for search BURs by tag.
@@ -255,7 +257,7 @@ class BulkUpdateRequestProcessor
normalized_consequent = PostQueryBuilder.new(consequent).parse_tag_edit
CurrentUser.scoped(user) do
Post.anon_tag_match(normalized_antecedent.join(" ")).find_each do |post|
Post.anon_tag_match(normalized_antecedent.join(" ")).reorder(nil).parallel_each do |post|
post.with_lock do
tags = (post.tag_array - normalized_antecedent + normalized_consequent).join(" ")
post.update(tag_string: tags)

View File

@@ -69,7 +69,7 @@ class TagMover
# Retag the posts from the old tag to the new tag.
def move_posts!
Post.raw_tag_match(old_tag.name).find_each do |post|
Post.raw_tag_match(old_tag.name).reorder(nil).parallel_each do |post|
post.lock!
post.remove_tag(old_tag.name)
post.add_tag(new_tag.name)

View File

@@ -166,6 +166,33 @@ class ApplicationRecord < ActiveRecord::Base
end
end
concerning :ConcurrencyMethods do
class_methods do
def parallel_each(batch_size: 1000, in_processes: 4, in_threads: nil)
# XXX Use threads in testing because processes can't see each other's
# database transactions.
if Rails.env.test?
in_processes = nil
in_threads = 2
end
current_user = CurrentUser.user
current_ip = CurrentUser.ip_addr
find_in_batches(batch_size: batch_size, error_on_ignore: true) do |batch|
Parallel.each(batch, in_processes: in_processes, in_threads: in_threads) do |record|
# XXX In threaded mode, the current user isn't inherited from the
# parent thread because the current user is a thread-local
# variable. Hence, we have to set it explicitly in the child thread.
CurrentUser.scoped(current_user, current_ip) do
yield record
end
end
end
end
end
end
def warnings
@warnings ||= ActiveModel::Errors.new(self)
end

View File

@@ -146,7 +146,7 @@ class TagImplication < TagRelationship
def update_posts!
CurrentUser.scoped(User.system) do
Post.system_tag_match("#{antecedent_name} -#{consequent_name}").find_each do |post|
Post.system_tag_match("#{antecedent_name} -#{consequent_name}").reorder(nil).parallel_each do |post|
post.lock!
post.save!
end

View File

@@ -21,4 +21,28 @@ class ApplicationRecordTest < ActiveSupport::TestCase
assert_equal(@tags.reverse, Tag.search(updated_at: ">=#{@tags.first.updated_at}"))
end
end
context "ApplicationRecord#parallel_each" do
context "in threaded mode" do
should "set CurrentUser correctly" do
@user1 = create(:user)
@user2 = create(:user)
CurrentUser.scoped(@user1, "1.1.1.1") do
Tag.parallel_each do |tag|
assert_equal(@user1, CurrentUser.user)
assert_equal("1.1.1.1", CurrentUser.ip_addr)
CurrentUser.scoped(@user2, "2.2.2.2") do
assert_equal(@user2, CurrentUser.user)
assert_equal("2.2.2.2", CurrentUser.ip_addr)
end
assert_equal(@user1, CurrentUser.user)
assert_equal("1.1.1.1", CurrentUser.ip_addr)
end
end
end
end
end
end

View File

@@ -266,6 +266,7 @@ class BulkUpdateRequestTest < ActiveSupport::TestCase
should "update the tags" do
assert_equal("bar", @post.reload.tag_string)
assert_equal("approved", @bur.reload.status)
assert_equal(User.system, @post.versions.last.updater)
end
should "be case-sensitive" do
@@ -288,6 +289,7 @@ class BulkUpdateRequestTest < ActiveSupport::TestCase
should "rename the tags" do
assert_equal("bar blah", @post.reload.tag_string)
assert_equal("approved", @bur.reload.status)
assert_equal(User.system, @post.versions.last.updater)
end
should "move the tag's artist entry and wiki page" do
@@ -351,6 +353,7 @@ class BulkUpdateRequestTest < ActiveSupport::TestCase
assert_equal("foo", @post.reload.tag_string)
assert_equal("approved", @bur.reload.status)
assert_equal(User.system, @post.versions.last.updater)
end
should "remove implications" do
@@ -370,6 +373,7 @@ class BulkUpdateRequestTest < ActiveSupport::TestCase
assert_equal([], @pool.post_ids)
assert_equal("approved", @bur.reload.status)
assert_equal(User.system, @pool.versions.last.updater)
end
end
@@ -514,6 +518,10 @@ class BulkUpdateRequestTest < ActiveSupport::TestCase
assert_equal("approved", @bur.reload.status)
end
should "update the post as DanbooruBot" do
assert_equal(User.system, @post.versions.last.updater)
end
should "set the BUR as failed if there is an unexpected error during processing" do
@bur = create(:bulk_update_request, script: "alias one -> two")
TagAlias.any_instance.stubs(:process!).raises(RuntimeError.new("oh no"))

View File

@@ -142,6 +142,8 @@ class TagAliasTest < ActiveSupport::TestCase
assert_equal("bbb ccc", post1.reload.tag_string)
assert_equal("ccc ddd", post2.reload.tag_string)
assert_equal(User.system, post1.versions.last.updater)
assert_equal(CurrentUser.user, post2.versions.last.updater)
end
should "not validate for transitive relations" do

View File

@@ -124,6 +124,8 @@ class TagImplicationTest < ActiveSupport::TestCase
assert_equal("sword weapon", p1.reload.tag_string)
assert_equal("sword weapon", p2.reload.tag_string)
assert_equal(User.system, p1.versions.last.updater)
assert_equal(CurrentUser.user, p2.versions.last.updater)
end
context "when calculating implied tags" do