BURs: update posts in parallel.

When processing an alias, rename, implication, mass update, or nuke,
update the posts in parallel. This means that if we alias foo to bar,
for example, then we use four processes at once to retag the posts from
foo to bar.

This doesn't mean that if we have two aliases in a BUR, we process both
aliases in parallel. It simply means that when processing an alias, we
update the posts in parallel for that alias.
This commit is contained in:
evazion
2021-09-19 22:30:54 -05:00
parent 21f0c2acc3
commit d854bf6b53
10 changed files with 99 additions and 32 deletions

View File

@@ -56,6 +56,7 @@ gem 'clockwork'
gem 'puma-metrics' gem 'puma-metrics'
gem 'puma_worker_killer' gem 'puma_worker_killer'
gem "rack-timeout", require: "rack/timeout/base" gem "rack-timeout", require: "rack/timeout/base"
gem "parallel"
group :production do group :production do
gem 'unicorn', :platforms => :ruby gem 'unicorn', :platforms => :ruby

View File

@@ -561,6 +561,7 @@ DEPENDENCIES
newrelic_rpm newrelic_rpm
nokogiri nokogiri
oauth2 oauth2
parallel
pg pg
pry-byebug pry-byebug
pry-rails pry-rails

View File

@@ -137,45 +137,47 @@ class BulkUpdateRequestProcessor
# Process the bulk update request immediately. # Process the bulk update request immediately.
def process! def process!
commands.map do |command, *args| CurrentUser.scoped(User.system, "127.0.0.1") do
case command commands.map do |command, *args|
when :create_alias case command
TagAlias.approve!(antecedent_name: args[0], consequent_name: args[1], approver: approver, forum_topic: forum_topic) when :create_alias
TagAlias.approve!(antecedent_name: args[0], consequent_name: args[1], approver: approver, forum_topic: forum_topic)
when :create_implication when :create_implication
TagImplication.approve!(antecedent_name: args[0], consequent_name: args[1], approver: approver, forum_topic: forum_topic) TagImplication.approve!(antecedent_name: args[0], consequent_name: args[1], approver: approver, forum_topic: forum_topic)
when :remove_alias when :remove_alias
tag_alias = TagAlias.active.find_by!(antecedent_name: args[0], consequent_name: args[1]) tag_alias = TagAlias.active.find_by!(antecedent_name: args[0], consequent_name: args[1])
tag_alias.reject!(User.system) tag_alias.reject!(User.system)
when :remove_implication when :remove_implication
tag_implication = TagImplication.active.find_by!(antecedent_name: args[0], consequent_name: args[1]) tag_implication = TagImplication.active.find_by!(antecedent_name: args[0], consequent_name: args[1])
tag_implication.reject!(User.system) tag_implication.reject!(User.system)
when :mass_update when :mass_update
BulkUpdateRequestProcessor.mass_update(args[0], args[1]) BulkUpdateRequestProcessor.mass_update(args[0], args[1])
when :nuke when :nuke
BulkUpdateRequestProcessor.nuke(args[0]) BulkUpdateRequestProcessor.nuke(args[0])
when :rename when :rename
TagMover.new(args[0], args[1], user: User.system).move! TagMover.new(args[0], args[1], user: User.system).move!
when :change_category when :change_category
tag = Tag.find_or_create_by_name(args[0]) tag = Tag.find_or_create_by_name(args[0])
tag.update!(category: Tag.categories.value_for(args[1])) tag.update!(category: Tag.categories.value_for(args[1]))
else else
# should never happen # should never happen
raise Error, "Unknown command: #{command}" raise Error, "Unknown command: #{command}"
end
end end
end
bulk_update_request.update!(status: "approved") bulk_update_request.update!(status: "approved")
rescue StandardError rescue StandardError
bulk_update_request.update!(status: "failed") bulk_update_request.update!(status: "failed")
raise raise
end
end end
# The list of tags in the script. Used for search BURs by tag. # The list of tags in the script. Used for search BURs by tag.
@@ -255,7 +257,7 @@ class BulkUpdateRequestProcessor
normalized_consequent = PostQueryBuilder.new(consequent).parse_tag_edit normalized_consequent = PostQueryBuilder.new(consequent).parse_tag_edit
CurrentUser.scoped(user) do CurrentUser.scoped(user) do
Post.anon_tag_match(normalized_antecedent.join(" ")).find_each do |post| Post.anon_tag_match(normalized_antecedent.join(" ")).reorder(nil).parallel_each do |post|
post.with_lock do post.with_lock do
tags = (post.tag_array - normalized_antecedent + normalized_consequent).join(" ") tags = (post.tag_array - normalized_antecedent + normalized_consequent).join(" ")
post.update(tag_string: tags) post.update(tag_string: tags)

View File

@@ -69,7 +69,7 @@ class TagMover
# Retag the posts from the old tag to the new tag. # Retag the posts from the old tag to the new tag.
def move_posts! def move_posts!
Post.raw_tag_match(old_tag.name).find_each do |post| Post.raw_tag_match(old_tag.name).reorder(nil).parallel_each do |post|
post.lock! post.lock!
post.remove_tag(old_tag.name) post.remove_tag(old_tag.name)
post.add_tag(new_tag.name) post.add_tag(new_tag.name)

View File

@@ -166,6 +166,33 @@ class ApplicationRecord < ActiveRecord::Base
end end
end end
concerning :ConcurrencyMethods do
class_methods do
def parallel_each(batch_size: 1000, in_processes: 4, in_threads: nil)
# XXX Use threads in testing because processes can't see each other's
# database transactions.
if Rails.env.test?
in_processes = nil
in_threads = 2
end
current_user = CurrentUser.user
current_ip = CurrentUser.ip_addr
find_in_batches(batch_size: batch_size, error_on_ignore: true) do |batch|
Parallel.each(batch, in_processes: in_processes, in_threads: in_threads) do |record|
# XXX In threaded mode, the current user isn't inherited from the
# parent thread because the current user is a thread-local
# variable. Hence, we have to set it explicitly in the child thread.
CurrentUser.scoped(current_user, current_ip) do
yield record
end
end
end
end
end
end
def warnings def warnings
@warnings ||= ActiveModel::Errors.new(self) @warnings ||= ActiveModel::Errors.new(self)
end end

View File

@@ -146,7 +146,7 @@ class TagImplication < TagRelationship
def update_posts! def update_posts!
CurrentUser.scoped(User.system) do CurrentUser.scoped(User.system) do
Post.system_tag_match("#{antecedent_name} -#{consequent_name}").find_each do |post| Post.system_tag_match("#{antecedent_name} -#{consequent_name}").reorder(nil).parallel_each do |post|
post.lock! post.lock!
post.save! post.save!
end end

View File

@@ -21,4 +21,28 @@ class ApplicationRecordTest < ActiveSupport::TestCase
assert_equal(@tags.reverse, Tag.search(updated_at: ">=#{@tags.first.updated_at}")) assert_equal(@tags.reverse, Tag.search(updated_at: ">=#{@tags.first.updated_at}"))
end end
end end
context "ApplicationRecord#parallel_each" do
context "in threaded mode" do
should "set CurrentUser correctly" do
@user1 = create(:user)
@user2 = create(:user)
CurrentUser.scoped(@user1, "1.1.1.1") do
Tag.parallel_each do |tag|
assert_equal(@user1, CurrentUser.user)
assert_equal("1.1.1.1", CurrentUser.ip_addr)
CurrentUser.scoped(@user2, "2.2.2.2") do
assert_equal(@user2, CurrentUser.user)
assert_equal("2.2.2.2", CurrentUser.ip_addr)
end
assert_equal(@user1, CurrentUser.user)
assert_equal("1.1.1.1", CurrentUser.ip_addr)
end
end
end
end
end
end end

View File

@@ -266,6 +266,7 @@ class BulkUpdateRequestTest < ActiveSupport::TestCase
should "update the tags" do should "update the tags" do
assert_equal("bar", @post.reload.tag_string) assert_equal("bar", @post.reload.tag_string)
assert_equal("approved", @bur.reload.status) assert_equal("approved", @bur.reload.status)
assert_equal(User.system, @post.versions.last.updater)
end end
should "be case-sensitive" do should "be case-sensitive" do
@@ -288,6 +289,7 @@ class BulkUpdateRequestTest < ActiveSupport::TestCase
should "rename the tags" do should "rename the tags" do
assert_equal("bar blah", @post.reload.tag_string) assert_equal("bar blah", @post.reload.tag_string)
assert_equal("approved", @bur.reload.status) assert_equal("approved", @bur.reload.status)
assert_equal(User.system, @post.versions.last.updater)
end end
should "move the tag's artist entry and wiki page" do should "move the tag's artist entry and wiki page" do
@@ -351,6 +353,7 @@ class BulkUpdateRequestTest < ActiveSupport::TestCase
assert_equal("foo", @post.reload.tag_string) assert_equal("foo", @post.reload.tag_string)
assert_equal("approved", @bur.reload.status) assert_equal("approved", @bur.reload.status)
assert_equal(User.system, @post.versions.last.updater)
end end
should "remove implications" do should "remove implications" do
@@ -370,6 +373,7 @@ class BulkUpdateRequestTest < ActiveSupport::TestCase
assert_equal([], @pool.post_ids) assert_equal([], @pool.post_ids)
assert_equal("approved", @bur.reload.status) assert_equal("approved", @bur.reload.status)
assert_equal(User.system, @pool.versions.last.updater)
end end
end end
@@ -514,6 +518,10 @@ class BulkUpdateRequestTest < ActiveSupport::TestCase
assert_equal("approved", @bur.reload.status) assert_equal("approved", @bur.reload.status)
end end
should "update the post as DanbooruBot" do
assert_equal(User.system, @post.versions.last.updater)
end
should "set the BUR as failed if there is an unexpected error during processing" do should "set the BUR as failed if there is an unexpected error during processing" do
@bur = create(:bulk_update_request, script: "alias one -> two") @bur = create(:bulk_update_request, script: "alias one -> two")
TagAlias.any_instance.stubs(:process!).raises(RuntimeError.new("oh no")) TagAlias.any_instance.stubs(:process!).raises(RuntimeError.new("oh no"))

View File

@@ -142,6 +142,8 @@ class TagAliasTest < ActiveSupport::TestCase
assert_equal("bbb ccc", post1.reload.tag_string) assert_equal("bbb ccc", post1.reload.tag_string)
assert_equal("ccc ddd", post2.reload.tag_string) assert_equal("ccc ddd", post2.reload.tag_string)
assert_equal(User.system, post1.versions.last.updater)
assert_equal(CurrentUser.user, post2.versions.last.updater)
end end
should "not validate for transitive relations" do should "not validate for transitive relations" do

View File

@@ -124,6 +124,8 @@ class TagImplicationTest < ActiveSupport::TestCase
assert_equal("sword weapon", p1.reload.tag_string) assert_equal("sword weapon", p1.reload.tag_string)
assert_equal("sword weapon", p2.reload.tag_string) assert_equal("sword weapon", p2.reload.tag_string)
assert_equal(User.system, p1.versions.last.updater)
assert_equal(CurrentUser.user, p2.versions.last.updater)
end end
context "when calculating implied tags" do context "when calculating implied tags" do