From 9ba84efc070fa48d14d3e52239e93c680f5b42d3 Mon Sep 17 00:00:00 2001 From: evazion Date: Sun, 19 Sep 2021 18:36:12 -0500 Subject: [PATCH] BURs: process BURs sequentially in a single job. Change the way BURs are processed. Before, we spawned a background job for each line of the BUR, then processed each job sequentially. Now, we process the entire BUR sequentially in a single background job. This means that: * BURs are truly sequential now. Before certain things like removing aliases weren't actually performed in a background job, so they were performed out-of-order before everything else in the BUR. * Before, if an alias or implication line failed, then subsequent alias or implication lines would still be processed. This was because each alias or implication line was queued as a separate job, so a failure of one job didn't block another. Now, if any alias or implication fails, the entire BUR will fail and stop processing after that line. This may be good or bad, depending on whether we actually need the BUR to be processed in order or not. * Before, BURs were processed inside a database transaction (except for the actual updating of posts). Now they're not. This is because we can't afford to hold transactions open while processing long-running aliases or implications. This means that if BUR fails in the middle when it is initially approved, it will be left in a half-complete state. Before it would be rolled back and left in a pending state with no changes performed. * Before, only one BUR at a time could be processed. If multiple BURs were approved at the same time, then they would queue up and be processed one at a time. Now, multiple BURs can be processed at the same time. This may be undesirable when processing large BURs, or BURs that must be approved in a specific order. * Before, large tag category changes could time out. This was because they weren't actually performed in a background job. Now they are, so they shouldn't time out. --- app/jobs/process_bulk_update_request_job.rb | 11 +++ app/jobs/process_tag_relationship_job.rb | 13 --- app/jobs/tag_batch_change_job.rb | 23 ----- app/jobs/tag_rename_job.rb | 9 -- app/logical/bulk_update_request_processor.rb | 94 ++++++++++++-------- app/models/bulk_update_request.rb | 2 +- app/models/tag_relationship.rb | 3 +- test/jobs/tag_batch_change_job_test.rb | 15 ---- test/unit/bulk_update_request_test.rb | 16 ++++ 9 files changed, 87 insertions(+), 99 deletions(-) create mode 100644 app/jobs/process_bulk_update_request_job.rb delete mode 100644 app/jobs/process_tag_relationship_job.rb delete mode 100644 app/jobs/tag_batch_change_job.rb delete mode 100644 app/jobs/tag_rename_job.rb delete mode 100644 test/jobs/tag_batch_change_job_test.rb diff --git a/app/jobs/process_bulk_update_request_job.rb b/app/jobs/process_bulk_update_request_job.rb new file mode 100644 index 000000000..19f46b9bc --- /dev/null +++ b/app/jobs/process_bulk_update_request_job.rb @@ -0,0 +1,11 @@ +# A job that applies a bulk update request after it is approved. +# +# @see {BulkUpdateRequestProcessor} +# @see {BulkUpdateRequest} +class ProcessBulkUpdateRequestJob < ApplicationJob + retry_on Exception, attempts: 0 + + def perform(bulk_update_request) + bulk_update_request.processor.process! + end +end diff --git a/app/jobs/process_tag_relationship_job.rb b/app/jobs/process_tag_relationship_job.rb deleted file mode 100644 index 78f9b4590..000000000 --- a/app/jobs/process_tag_relationship_job.rb +++ /dev/null @@ -1,13 +0,0 @@ -# A job that processes a single tag alias or implication when a bulk update -# request is approved. One job per alias or implication is spawned. Jobs are -# processed sequentially in the `bulk_update` queue. -class ProcessTagRelationshipJob < ApplicationJob - queue_as :bulk_update - retry_on Exception, attempts: 0 - - def perform(class_name:, approver:, antecedent_name:, consequent_name:, forum_topic: nil) - relation_class = Kernel.const_get(class_name) - tag_relationship = relation_class.create!(creator: approver, approver: approver, antecedent_name: antecedent_name, consequent_name: consequent_name, forum_topic: forum_topic) - tag_relationship.process! - end -end diff --git a/app/jobs/tag_batch_change_job.rb b/app/jobs/tag_batch_change_job.rb deleted file mode 100644 index f04aa4254..000000000 --- a/app/jobs/tag_batch_change_job.rb +++ /dev/null @@ -1,23 +0,0 @@ -# A job that performs a mass update or tag nuke operation in a bulk update -# request. Jobs in the `bulk_update` queue are processed sequentially. -class TagBatchChangeJob < ApplicationJob - queue_as :bulk_update - - def perform(antecedent, consequent) - normalized_antecedent = PostQueryBuilder.new(antecedent).split_query - normalized_consequent = PostQueryBuilder.new(consequent).parse_tag_edit - - CurrentUser.scoped(User.system) do - migrate_posts(normalized_antecedent, normalized_consequent) - end - end - - def migrate_posts(normalized_antecedent, normalized_consequent) - ::Post.system_tag_match(normalized_antecedent.join(" ")).find_each do |post| - post.with_lock do - tags = (post.tag_array - normalized_antecedent + normalized_consequent).join(" ") - post.update(tag_string: tags) - end - end - end -end diff --git a/app/jobs/tag_rename_job.rb b/app/jobs/tag_rename_job.rb deleted file mode 100644 index d640f9d97..000000000 --- a/app/jobs/tag_rename_job.rb +++ /dev/null @@ -1,9 +0,0 @@ -# A job that performs a tag rename or alias operation in a bulk update request. -# Jobs in the `bulk_update` queue are processed sequentially. -class TagRenameJob < ApplicationJob - queue_as :bulk_update - - def perform(old_tag_name, new_tag_name) - TagMover.new(old_tag_name, new_tag_name, user: User.system).move! - end -end diff --git a/app/logical/bulk_update_request_processor.rb b/app/logical/bulk_update_request_processor.rb index ed0fb3c61..65767f00d 100644 --- a/app/logical/bulk_update_request_processor.rb +++ b/app/logical/bulk_update_request_processor.rb @@ -16,11 +16,11 @@ class BulkUpdateRequestProcessor attr_reader :bulk_update_request - delegate :script, :forum_topic, to: :bulk_update_request + delegate :script, :forum_topic, :approver, to: :bulk_update_request validate :validate_script validate :validate_script_length - # @param bulk_update_request [String] the BUR + # @param bulk_update_request [BulkUpdateRequest] the BUR def initialize(bulk_update_request) @bulk_update_request = bulk_update_request end @@ -130,50 +130,45 @@ class BulkUpdateRequestProcessor end end - # Apply the script. - # @param approver [User] the approver of the request - def process!(approver) - ActiveRecord::Base.transaction do - commands.map do |command, *args| - case command - when :create_alias - TagAlias.approve!(antecedent_name: args[0], consequent_name: args[1], approver: approver, forum_topic: forum_topic) + # Schedule the bulk update request to be processed later, in the background. + def process_later! + ProcessBulkUpdateRequestJob.perform_later(bulk_update_request) + end - when :create_implication - TagImplication.approve!(antecedent_name: args[0], consequent_name: args[1], approver: approver, forum_topic: forum_topic) + # Process the bulk update request immediately. + def process! + commands.map do |command, *args| + case command + when :create_alias + TagAlias.approve!(antecedent_name: args[0], consequent_name: args[1], approver: approver, forum_topic: forum_topic) - when :remove_alias - tag_alias = TagAlias.active.find_by!(antecedent_name: args[0], consequent_name: args[1]) - tag_alias.reject!(User.system) + when :create_implication + TagImplication.approve!(antecedent_name: args[0], consequent_name: args[1], approver: approver, forum_topic: forum_topic) - when :remove_implication - tag_implication = TagImplication.active.find_by!(antecedent_name: args[0], consequent_name: args[1]) - tag_implication.reject!(User.system) + when :remove_alias + tag_alias = TagAlias.active.find_by!(antecedent_name: args[0], consequent_name: args[1]) + tag_alias.reject!(User.system) - when :mass_update - TagBatchChangeJob.perform_later(args[0], args[1]) + when :remove_implication + tag_implication = TagImplication.active.find_by!(antecedent_name: args[0], consequent_name: args[1]) + tag_implication.reject!(User.system) - when :nuke - # Reject existing implications from any other tag to the one we're nuking - # otherwise the tag won't be removed from posts that have those other tags - if PostQueryBuilder.new(args[0]).is_simple_tag? - TagImplication.active.where(consequent_name: args[0]).each { |ti| ti.reject!(User.system) } - TagImplication.active.where(antecedent_name: args[0]).each { |ti| ti.reject!(User.system) } - end + when :mass_update + BulkUpdateRequestProcessor.mass_update(args[0], args[1]) - TagBatchChangeJob.perform_later(args[0], "-#{args[0]}") + when :nuke + BulkUpdateRequestProcessor.nuke(args[0]) - when :rename - TagRenameJob.perform_later(args[0], args[1]) + when :rename + TagMover.new(args[0], args[1], user: User.system).move! - when :change_category - tag = Tag.find_or_create_by_name(args[0]) - tag.update!(category: Tag.categories.value_for(args[1])) + when :change_category + tag = Tag.find_or_create_by_name(args[0]) + tag.update!(category: Tag.categories.value_for(args[1])) - else - # should never happen - raise Error, "Unknown command: #{command}" - end + else + # should never happen + raise Error, "Unknown command: #{command}" end end end @@ -239,6 +234,31 @@ class BulkUpdateRequestProcessor end.join("\n") end + def self.nuke(tag_name) + # Reject existing implications from any other tag to the one we're nuking + # otherwise the tag won't be removed from posts that have those other tags + if PostQueryBuilder.new(tag_name).is_simple_tag? + TagImplication.active.where(consequent_name: tag_name).each { |ti| ti.reject!(User.system) } + TagImplication.active.where(antecedent_name: tag_name).each { |ti| ti.reject!(User.system) } + end + + mass_update(tag_name, "-#{tag_name}") + end + + def self.mass_update(antecedent, consequent, user: User.system) + normalized_antecedent = PostQueryBuilder.new(antecedent).split_query + normalized_consequent = PostQueryBuilder.new(consequent).parse_tag_edit + + CurrentUser.scoped(user) do + Post.anon_tag_match(normalized_antecedent.join(" ")).find_each do |post| + post.with_lock do + tags = (post.tag_array - normalized_antecedent + normalized_consequent).join(" ") + post.update(tag_string: tags) + end + end + end + end + # Tag move is allowed if: # # * The antecedent tag is an artist tag. diff --git a/app/models/bulk_update_request.rb b/app/models/bulk_update_request.rb index 95562a465..dc0ae4bb0 100644 --- a/app/models/bulk_update_request.rb +++ b/app/models/bulk_update_request.rb @@ -62,8 +62,8 @@ class BulkUpdateRequest < ApplicationRecord transaction do CurrentUser.scoped(approver) do processor.validate!(:approval) - processor.process!(approver) update!(status: "approved", approver: approver) + processor.process_later! forum_updater.update("The #{bulk_update_request_link} (forum ##{forum_post.id}) has been approved by @#{approver.name}.") end end diff --git a/app/models/tag_relationship.rb b/app/models/tag_relationship.rb index 60010a5a7..6cbd532c7 100644 --- a/app/models/tag_relationship.rb +++ b/app/models/tag_relationship.rb @@ -114,7 +114,8 @@ class TagRelationship < ApplicationRecord end def self.approve!(antecedent_name:, consequent_name:, approver:, forum_topic: nil) - ProcessTagRelationshipJob.perform_later(class_name: name, approver: approver, antecedent_name: antecedent_name, consequent_name: consequent_name, forum_topic: forum_topic) + tag_relationship = create!(creator: approver, approver: approver, antecedent_name: antecedent_name, consequent_name: consequent_name, forum_topic: forum_topic) + tag_relationship.process! end def self.model_restriction(table) diff --git a/test/jobs/tag_batch_change_job_test.rb b/test/jobs/tag_batch_change_job_test.rb deleted file mode 100644 index 3b44611ad..000000000 --- a/test/jobs/tag_batch_change_job_test.rb +++ /dev/null @@ -1,15 +0,0 @@ -require "test_helper" - -class TagBatchChangeJobTest < ActiveJob::TestCase - context "a tag batch change" do - setup do - @user = create(:moderator_user) - @post = create(:post, :tag_string => "aaa") - end - - should "execute" do - TagBatchChangeJob.perform_now("aaa", "bbb") - assert_equal("bbb", @post.reload.tag_string) - end - end -end diff --git a/test/unit/bulk_update_request_test.rb b/test/unit/bulk_update_request_test.rb index 5e93405c4..2ee22b05c 100644 --- a/test/unit/bulk_update_request_test.rb +++ b/test/unit/bulk_update_request_test.rb @@ -205,6 +205,14 @@ class BulkUpdateRequestTest < ActiveSupport::TestCase assert_equal(false, @bur.valid?) assert_equal(["Can't remove alias foo -> bar (alias doesn't exist)"], @bur.errors[:base]) end + + should "be processed sequentially after the create alias command" do + @bur = create_bur!("create alias foo -> bar\nremove alias foo -> bar", @admin) + + @alias = TagAlias.find_by(antecedent_name: "foo", consequent_name: "bar") + assert_equal(true, @alias.present?) + assert_equal(true, @alias.is_deleted?) + end end context "the remove implication command" do @@ -231,6 +239,14 @@ class BulkUpdateRequestTest < ActiveSupport::TestCase assert_equal(false, @bur.valid?) assert_equal(["Can't remove implication foo -> bar (implication doesn't exist)"], @bur.errors[:base]) end + + should "be processed sequentially after the create implication command" do + @bur = create_bur!("imply foo -> bar\nunimply foo -> bar", @admin) + + @ti = TagImplication.find_by(antecedent_name: "foo", consequent_name: "bar") + assert_equal(true, @ti.present?) + assert_equal(true, @ti.is_deleted?) + end end context "the mass update command" do