BURs: update posts in parallel.
When processing an alias, rename, implication, mass update, or nuke, update the posts in parallel. This means that if we alias foo to bar, for example, then we use four processes at once to retag the posts from foo to bar. This doesn't mean that if we have two aliases in a BUR, we process both aliases in parallel. It simply means that when processing an alias, we update the posts in parallel for that alias.
This commit is contained in:
1
Gemfile
1
Gemfile
@@ -56,6 +56,7 @@ gem 'clockwork'
|
||||
gem 'puma-metrics'
|
||||
gem 'puma_worker_killer'
|
||||
gem "rack-timeout", require: "rack/timeout/base"
|
||||
gem "parallel"
|
||||
|
||||
group :production do
|
||||
gem 'unicorn', :platforms => :ruby
|
||||
|
||||
@@ -561,6 +561,7 @@ DEPENDENCIES
|
||||
newrelic_rpm
|
||||
nokogiri
|
||||
oauth2
|
||||
parallel
|
||||
pg
|
||||
pry-byebug
|
||||
pry-rails
|
||||
|
||||
@@ -137,45 +137,47 @@ class BulkUpdateRequestProcessor
|
||||
|
||||
# Process the bulk update request immediately.
|
||||
def process!
|
||||
commands.map do |command, *args|
|
||||
case command
|
||||
when :create_alias
|
||||
TagAlias.approve!(antecedent_name: args[0], consequent_name: args[1], approver: approver, forum_topic: forum_topic)
|
||||
CurrentUser.scoped(User.system, "127.0.0.1") do
|
||||
commands.map do |command, *args|
|
||||
case command
|
||||
when :create_alias
|
||||
TagAlias.approve!(antecedent_name: args[0], consequent_name: args[1], approver: approver, forum_topic: forum_topic)
|
||||
|
||||
when :create_implication
|
||||
TagImplication.approve!(antecedent_name: args[0], consequent_name: args[1], approver: approver, forum_topic: forum_topic)
|
||||
when :create_implication
|
||||
TagImplication.approve!(antecedent_name: args[0], consequent_name: args[1], approver: approver, forum_topic: forum_topic)
|
||||
|
||||
when :remove_alias
|
||||
tag_alias = TagAlias.active.find_by!(antecedent_name: args[0], consequent_name: args[1])
|
||||
tag_alias.reject!(User.system)
|
||||
when :remove_alias
|
||||
tag_alias = TagAlias.active.find_by!(antecedent_name: args[0], consequent_name: args[1])
|
||||
tag_alias.reject!(User.system)
|
||||
|
||||
when :remove_implication
|
||||
tag_implication = TagImplication.active.find_by!(antecedent_name: args[0], consequent_name: args[1])
|
||||
tag_implication.reject!(User.system)
|
||||
when :remove_implication
|
||||
tag_implication = TagImplication.active.find_by!(antecedent_name: args[0], consequent_name: args[1])
|
||||
tag_implication.reject!(User.system)
|
||||
|
||||
when :mass_update
|
||||
BulkUpdateRequestProcessor.mass_update(args[0], args[1])
|
||||
when :mass_update
|
||||
BulkUpdateRequestProcessor.mass_update(args[0], args[1])
|
||||
|
||||
when :nuke
|
||||
BulkUpdateRequestProcessor.nuke(args[0])
|
||||
when :nuke
|
||||
BulkUpdateRequestProcessor.nuke(args[0])
|
||||
|
||||
when :rename
|
||||
TagMover.new(args[0], args[1], user: User.system).move!
|
||||
when :rename
|
||||
TagMover.new(args[0], args[1], user: User.system).move!
|
||||
|
||||
when :change_category
|
||||
tag = Tag.find_or_create_by_name(args[0])
|
||||
tag.update!(category: Tag.categories.value_for(args[1]))
|
||||
when :change_category
|
||||
tag = Tag.find_or_create_by_name(args[0])
|
||||
tag.update!(category: Tag.categories.value_for(args[1]))
|
||||
|
||||
else
|
||||
# should never happen
|
||||
raise Error, "Unknown command: #{command}"
|
||||
else
|
||||
# should never happen
|
||||
raise Error, "Unknown command: #{command}"
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
bulk_update_request.update!(status: "approved")
|
||||
rescue StandardError
|
||||
bulk_update_request.update!(status: "failed")
|
||||
raise
|
||||
bulk_update_request.update!(status: "approved")
|
||||
rescue StandardError
|
||||
bulk_update_request.update!(status: "failed")
|
||||
raise
|
||||
end
|
||||
end
|
||||
|
||||
# The list of tags in the script. Used for search BURs by tag.
|
||||
@@ -255,7 +257,7 @@ class BulkUpdateRequestProcessor
|
||||
normalized_consequent = PostQueryBuilder.new(consequent).parse_tag_edit
|
||||
|
||||
CurrentUser.scoped(user) do
|
||||
Post.anon_tag_match(normalized_antecedent.join(" ")).find_each do |post|
|
||||
Post.anon_tag_match(normalized_antecedent.join(" ")).reorder(nil).parallel_each do |post|
|
||||
post.with_lock do
|
||||
tags = (post.tag_array - normalized_antecedent + normalized_consequent).join(" ")
|
||||
post.update(tag_string: tags)
|
||||
|
||||
@@ -69,7 +69,7 @@ class TagMover
|
||||
|
||||
# Retag the posts from the old tag to the new tag.
|
||||
def move_posts!
|
||||
Post.raw_tag_match(old_tag.name).find_each do |post|
|
||||
Post.raw_tag_match(old_tag.name).reorder(nil).parallel_each do |post|
|
||||
post.lock!
|
||||
post.remove_tag(old_tag.name)
|
||||
post.add_tag(new_tag.name)
|
||||
|
||||
@@ -166,6 +166,33 @@ class ApplicationRecord < ActiveRecord::Base
|
||||
end
|
||||
end
|
||||
|
||||
concerning :ConcurrencyMethods do
|
||||
class_methods do
|
||||
def parallel_each(batch_size: 1000, in_processes: 4, in_threads: nil)
|
||||
# XXX Use threads in testing because processes can't see each other's
|
||||
# database transactions.
|
||||
if Rails.env.test?
|
||||
in_processes = nil
|
||||
in_threads = 2
|
||||
end
|
||||
|
||||
current_user = CurrentUser.user
|
||||
current_ip = CurrentUser.ip_addr
|
||||
|
||||
find_in_batches(batch_size: batch_size, error_on_ignore: true) do |batch|
|
||||
Parallel.each(batch, in_processes: in_processes, in_threads: in_threads) do |record|
|
||||
# XXX In threaded mode, the current user isn't inherited from the
|
||||
# parent thread because the current user is a thread-local
|
||||
# variable. Hence, we have to set it explicitly in the child thread.
|
||||
CurrentUser.scoped(current_user, current_ip) do
|
||||
yield record
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def warnings
|
||||
@warnings ||= ActiveModel::Errors.new(self)
|
||||
end
|
||||
|
||||
@@ -146,7 +146,7 @@ class TagImplication < TagRelationship
|
||||
|
||||
def update_posts!
|
||||
CurrentUser.scoped(User.system) do
|
||||
Post.system_tag_match("#{antecedent_name} -#{consequent_name}").find_each do |post|
|
||||
Post.system_tag_match("#{antecedent_name} -#{consequent_name}").reorder(nil).parallel_each do |post|
|
||||
post.lock!
|
||||
post.save!
|
||||
end
|
||||
|
||||
@@ -21,4 +21,28 @@ class ApplicationRecordTest < ActiveSupport::TestCase
|
||||
assert_equal(@tags.reverse, Tag.search(updated_at: ">=#{@tags.first.updated_at}"))
|
||||
end
|
||||
end
|
||||
|
||||
context "ApplicationRecord#parallel_each" do
|
||||
context "in threaded mode" do
|
||||
should "set CurrentUser correctly" do
|
||||
@user1 = create(:user)
|
||||
@user2 = create(:user)
|
||||
|
||||
CurrentUser.scoped(@user1, "1.1.1.1") do
|
||||
Tag.parallel_each do |tag|
|
||||
assert_equal(@user1, CurrentUser.user)
|
||||
assert_equal("1.1.1.1", CurrentUser.ip_addr)
|
||||
|
||||
CurrentUser.scoped(@user2, "2.2.2.2") do
|
||||
assert_equal(@user2, CurrentUser.user)
|
||||
assert_equal("2.2.2.2", CurrentUser.ip_addr)
|
||||
end
|
||||
|
||||
assert_equal(@user1, CurrentUser.user)
|
||||
assert_equal("1.1.1.1", CurrentUser.ip_addr)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@@ -266,6 +266,7 @@ class BulkUpdateRequestTest < ActiveSupport::TestCase
|
||||
should "update the tags" do
|
||||
assert_equal("bar", @post.reload.tag_string)
|
||||
assert_equal("approved", @bur.reload.status)
|
||||
assert_equal(User.system, @post.versions.last.updater)
|
||||
end
|
||||
|
||||
should "be case-sensitive" do
|
||||
@@ -288,6 +289,7 @@ class BulkUpdateRequestTest < ActiveSupport::TestCase
|
||||
should "rename the tags" do
|
||||
assert_equal("bar blah", @post.reload.tag_string)
|
||||
assert_equal("approved", @bur.reload.status)
|
||||
assert_equal(User.system, @post.versions.last.updater)
|
||||
end
|
||||
|
||||
should "move the tag's artist entry and wiki page" do
|
||||
@@ -351,6 +353,7 @@ class BulkUpdateRequestTest < ActiveSupport::TestCase
|
||||
|
||||
assert_equal("foo", @post.reload.tag_string)
|
||||
assert_equal("approved", @bur.reload.status)
|
||||
assert_equal(User.system, @post.versions.last.updater)
|
||||
end
|
||||
|
||||
should "remove implications" do
|
||||
@@ -370,6 +373,7 @@ class BulkUpdateRequestTest < ActiveSupport::TestCase
|
||||
|
||||
assert_equal([], @pool.post_ids)
|
||||
assert_equal("approved", @bur.reload.status)
|
||||
assert_equal(User.system, @pool.versions.last.updater)
|
||||
end
|
||||
end
|
||||
|
||||
@@ -514,6 +518,10 @@ class BulkUpdateRequestTest < ActiveSupport::TestCase
|
||||
assert_equal("approved", @bur.reload.status)
|
||||
end
|
||||
|
||||
should "update the post as DanbooruBot" do
|
||||
assert_equal(User.system, @post.versions.last.updater)
|
||||
end
|
||||
|
||||
should "set the BUR as failed if there is an unexpected error during processing" do
|
||||
@bur = create(:bulk_update_request, script: "alias one -> two")
|
||||
TagAlias.any_instance.stubs(:process!).raises(RuntimeError.new("oh no"))
|
||||
|
||||
@@ -142,6 +142,8 @@ class TagAliasTest < ActiveSupport::TestCase
|
||||
|
||||
assert_equal("bbb ccc", post1.reload.tag_string)
|
||||
assert_equal("ccc ddd", post2.reload.tag_string)
|
||||
assert_equal(User.system, post1.versions.last.updater)
|
||||
assert_equal(CurrentUser.user, post2.versions.last.updater)
|
||||
end
|
||||
|
||||
should "not validate for transitive relations" do
|
||||
|
||||
@@ -124,6 +124,8 @@ class TagImplicationTest < ActiveSupport::TestCase
|
||||
|
||||
assert_equal("sword weapon", p1.reload.tag_string)
|
||||
assert_equal("sword weapon", p2.reload.tag_string)
|
||||
assert_equal(User.system, p1.versions.last.updater)
|
||||
assert_equal(CurrentUser.user, p2.versions.last.updater)
|
||||
end
|
||||
|
||||
context "when calculating implied tags" do
|
||||
|
||||
Reference in New Issue
Block a user