BURs: process BURs sequentially in a single job.

Change the way BURs are processed. Before, we spawned a background job
for each line of the BUR, then processed each job sequentially. Now, we
process the entire BUR sequentially in a single background job.

This means that:

* BURs are truly sequential now. Before certain things like removing
  aliases weren't actually performed in a background job, so they were
  performed out-of-order before everything else in the BUR.

* Before, if an alias or implication line failed, then subsequent alias
  or implication lines would still be processed. This was because each
  alias or implication line was queued as a separate job, so a failure
  of one job didn't block another. Now, if any alias or implication
  fails, the entire BUR will fail and stop processing after that line.
  This may be good or bad, depending on whether we actually need the BUR
  to be processed in order or not.

* Before, BURs were processed inside a database transaction (except for the
  actual updating of posts). Now they're not. This is because we can't
  afford to hold transactions open while processing long-running aliases
  or implications. This means that if BUR fails in the middle when it is
  initially approved, it will be left in a half-complete state. Before
  it would be rolled back and left in a pending state with no changes
  performed.

* Before, only one BUR at a time could be processed. If multiple BURs
  were approved at the same time, then they would queue up and be
  processed one at a time. Now, multiple BURs can be processed at the
  same time. This may be undesirable when processing large BURs, or BURs
  that must be approved in a specific order.

* Before, large tag category changes could time out. This was because
  they weren't actually performed in a background job. Now they are, so
  they shouldn't time out.
This commit is contained in:
evazion
2021-09-19 18:36:12 -05:00
parent 96c5c346ad
commit 9ba84efc07
9 changed files with 87 additions and 99 deletions

View File

@@ -0,0 +1,11 @@
# A job that applies a bulk update request after it is approved.
#
# @see {BulkUpdateRequestProcessor}
# @see {BulkUpdateRequest}
class ProcessBulkUpdateRequestJob < ApplicationJob
retry_on Exception, attempts: 0
def perform(bulk_update_request)
bulk_update_request.processor.process!
end
end

View File

@@ -1,13 +0,0 @@
# A job that processes a single tag alias or implication when a bulk update
# request is approved. One job per alias or implication is spawned. Jobs are
# processed sequentially in the `bulk_update` queue.
class ProcessTagRelationshipJob < ApplicationJob
queue_as :bulk_update
retry_on Exception, attempts: 0
def perform(class_name:, approver:, antecedent_name:, consequent_name:, forum_topic: nil)
relation_class = Kernel.const_get(class_name)
tag_relationship = relation_class.create!(creator: approver, approver: approver, antecedent_name: antecedent_name, consequent_name: consequent_name, forum_topic: forum_topic)
tag_relationship.process!
end
end

View File

@@ -1,23 +0,0 @@
# A job that performs a mass update or tag nuke operation in a bulk update
# request. Jobs in the `bulk_update` queue are processed sequentially.
class TagBatchChangeJob < ApplicationJob
queue_as :bulk_update
def perform(antecedent, consequent)
normalized_antecedent = PostQueryBuilder.new(antecedent).split_query
normalized_consequent = PostQueryBuilder.new(consequent).parse_tag_edit
CurrentUser.scoped(User.system) do
migrate_posts(normalized_antecedent, normalized_consequent)
end
end
def migrate_posts(normalized_antecedent, normalized_consequent)
::Post.system_tag_match(normalized_antecedent.join(" ")).find_each do |post|
post.with_lock do
tags = (post.tag_array - normalized_antecedent + normalized_consequent).join(" ")
post.update(tag_string: tags)
end
end
end
end

View File

@@ -1,9 +0,0 @@
# A job that performs a tag rename or alias operation in a bulk update request.
# Jobs in the `bulk_update` queue are processed sequentially.
class TagRenameJob < ApplicationJob
queue_as :bulk_update
def perform(old_tag_name, new_tag_name)
TagMover.new(old_tag_name, new_tag_name, user: User.system).move!
end
end

View File

@@ -16,11 +16,11 @@ class BulkUpdateRequestProcessor
attr_reader :bulk_update_request attr_reader :bulk_update_request
delegate :script, :forum_topic, to: :bulk_update_request delegate :script, :forum_topic, :approver, to: :bulk_update_request
validate :validate_script validate :validate_script
validate :validate_script_length validate :validate_script_length
# @param bulk_update_request [String] the BUR # @param bulk_update_request [BulkUpdateRequest] the BUR
def initialize(bulk_update_request) def initialize(bulk_update_request)
@bulk_update_request = bulk_update_request @bulk_update_request = bulk_update_request
end end
@@ -130,50 +130,45 @@ class BulkUpdateRequestProcessor
end end
end end
# Apply the script. # Schedule the bulk update request to be processed later, in the background.
# @param approver [User] the approver of the request def process_later!
def process!(approver) ProcessBulkUpdateRequestJob.perform_later(bulk_update_request)
ActiveRecord::Base.transaction do end
commands.map do |command, *args|
case command
when :create_alias
TagAlias.approve!(antecedent_name: args[0], consequent_name: args[1], approver: approver, forum_topic: forum_topic)
when :create_implication # Process the bulk update request immediately.
TagImplication.approve!(antecedent_name: args[0], consequent_name: args[1], approver: approver, forum_topic: forum_topic) def process!
commands.map do |command, *args|
case command
when :create_alias
TagAlias.approve!(antecedent_name: args[0], consequent_name: args[1], approver: approver, forum_topic: forum_topic)
when :remove_alias when :create_implication
tag_alias = TagAlias.active.find_by!(antecedent_name: args[0], consequent_name: args[1]) TagImplication.approve!(antecedent_name: args[0], consequent_name: args[1], approver: approver, forum_topic: forum_topic)
tag_alias.reject!(User.system)
when :remove_implication when :remove_alias
tag_implication = TagImplication.active.find_by!(antecedent_name: args[0], consequent_name: args[1]) tag_alias = TagAlias.active.find_by!(antecedent_name: args[0], consequent_name: args[1])
tag_implication.reject!(User.system) tag_alias.reject!(User.system)
when :mass_update when :remove_implication
TagBatchChangeJob.perform_later(args[0], args[1]) tag_implication = TagImplication.active.find_by!(antecedent_name: args[0], consequent_name: args[1])
tag_implication.reject!(User.system)
when :nuke when :mass_update
# Reject existing implications from any other tag to the one we're nuking BulkUpdateRequestProcessor.mass_update(args[0], args[1])
# otherwise the tag won't be removed from posts that have those other tags
if PostQueryBuilder.new(args[0]).is_simple_tag?
TagImplication.active.where(consequent_name: args[0]).each { |ti| ti.reject!(User.system) }
TagImplication.active.where(antecedent_name: args[0]).each { |ti| ti.reject!(User.system) }
end
TagBatchChangeJob.perform_later(args[0], "-#{args[0]}") when :nuke
BulkUpdateRequestProcessor.nuke(args[0])
when :rename when :rename
TagRenameJob.perform_later(args[0], args[1]) TagMover.new(args[0], args[1], user: User.system).move!
when :change_category when :change_category
tag = Tag.find_or_create_by_name(args[0]) tag = Tag.find_or_create_by_name(args[0])
tag.update!(category: Tag.categories.value_for(args[1])) tag.update!(category: Tag.categories.value_for(args[1]))
else else
# should never happen # should never happen
raise Error, "Unknown command: #{command}" raise Error, "Unknown command: #{command}"
end
end end
end end
end end
@@ -239,6 +234,31 @@ class BulkUpdateRequestProcessor
end.join("\n") end.join("\n")
end end
def self.nuke(tag_name)
# Reject existing implications from any other tag to the one we're nuking
# otherwise the tag won't be removed from posts that have those other tags
if PostQueryBuilder.new(tag_name).is_simple_tag?
TagImplication.active.where(consequent_name: tag_name).each { |ti| ti.reject!(User.system) }
TagImplication.active.where(antecedent_name: tag_name).each { |ti| ti.reject!(User.system) }
end
mass_update(tag_name, "-#{tag_name}")
end
def self.mass_update(antecedent, consequent, user: User.system)
normalized_antecedent = PostQueryBuilder.new(antecedent).split_query
normalized_consequent = PostQueryBuilder.new(consequent).parse_tag_edit
CurrentUser.scoped(user) do
Post.anon_tag_match(normalized_antecedent.join(" ")).find_each do |post|
post.with_lock do
tags = (post.tag_array - normalized_antecedent + normalized_consequent).join(" ")
post.update(tag_string: tags)
end
end
end
end
# Tag move is allowed if: # Tag move is allowed if:
# #
# * The antecedent tag is an artist tag. # * The antecedent tag is an artist tag.

View File

@@ -62,8 +62,8 @@ class BulkUpdateRequest < ApplicationRecord
transaction do transaction do
CurrentUser.scoped(approver) do CurrentUser.scoped(approver) do
processor.validate!(:approval) processor.validate!(:approval)
processor.process!(approver)
update!(status: "approved", approver: approver) update!(status: "approved", approver: approver)
processor.process_later!
forum_updater.update("The #{bulk_update_request_link} (forum ##{forum_post.id}) has been approved by @#{approver.name}.") forum_updater.update("The #{bulk_update_request_link} (forum ##{forum_post.id}) has been approved by @#{approver.name}.")
end end
end end

View File

@@ -114,7 +114,8 @@ class TagRelationship < ApplicationRecord
end end
def self.approve!(antecedent_name:, consequent_name:, approver:, forum_topic: nil) def self.approve!(antecedent_name:, consequent_name:, approver:, forum_topic: nil)
ProcessTagRelationshipJob.perform_later(class_name: name, approver: approver, antecedent_name: antecedent_name, consequent_name: consequent_name, forum_topic: forum_topic) tag_relationship = create!(creator: approver, approver: approver, antecedent_name: antecedent_name, consequent_name: consequent_name, forum_topic: forum_topic)
tag_relationship.process!
end end
def self.model_restriction(table) def self.model_restriction(table)

View File

@@ -1,15 +0,0 @@
require "test_helper"
class TagBatchChangeJobTest < ActiveJob::TestCase
context "a tag batch change" do
setup do
@user = create(:moderator_user)
@post = create(:post, :tag_string => "aaa")
end
should "execute" do
TagBatchChangeJob.perform_now("aaa", "bbb")
assert_equal("bbb", @post.reload.tag_string)
end
end
end

View File

@@ -205,6 +205,14 @@ class BulkUpdateRequestTest < ActiveSupport::TestCase
assert_equal(false, @bur.valid?) assert_equal(false, @bur.valid?)
assert_equal(["Can't remove alias foo -> bar (alias doesn't exist)"], @bur.errors[:base]) assert_equal(["Can't remove alias foo -> bar (alias doesn't exist)"], @bur.errors[:base])
end end
should "be processed sequentially after the create alias command" do
@bur = create_bur!("create alias foo -> bar\nremove alias foo -> bar", @admin)
@alias = TagAlias.find_by(antecedent_name: "foo", consequent_name: "bar")
assert_equal(true, @alias.present?)
assert_equal(true, @alias.is_deleted?)
end
end end
context "the remove implication command" do context "the remove implication command" do
@@ -231,6 +239,14 @@ class BulkUpdateRequestTest < ActiveSupport::TestCase
assert_equal(false, @bur.valid?) assert_equal(false, @bur.valid?)
assert_equal(["Can't remove implication foo -> bar (implication doesn't exist)"], @bur.errors[:base]) assert_equal(["Can't remove implication foo -> bar (implication doesn't exist)"], @bur.errors[:base])
end end
should "be processed sequentially after the create implication command" do
@bur = create_bur!("imply foo -> bar\nunimply foo -> bar", @admin)
@ti = TagImplication.find_by(antecedent_name: "foo", consequent_name: "bar")
assert_equal(true, @ti.present?)
assert_equal(true, @ti.is_deleted?)
end
end end
context "the mass update command" do context "the mass update command" do