From 3bcc503cf7372013b5e8da5ada42292a1328f69e Mon Sep 17 00:00:00 2001 From: evazion Date: Fri, 27 Dec 2019 12:11:47 -0600 Subject: [PATCH] BURs: process bulk updates in dedicated job queue. Add a dedicated queue for bulk update requests and process it using a single worker. This prevents bulk updates from consuming all available workers and preventing other job types from running. This also effectively serializes bulk updates so that they're processed one-at-a-time instead of in parallel. This will be slower overall but may avoid some of the issues with indeterminate update order under parallel updates. --- app/jobs/process_tag_alias_job.rb | 3 +-- app/jobs/process_tag_implication_job.rb | 3 +-- app/jobs/tag_batch_change_job.rb | 2 +- lib/capistrano/tasks/delayed_job.rake | 2 ++ 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/app/jobs/process_tag_alias_job.rb b/app/jobs/process_tag_alias_job.rb index 28b636d94..4a6881565 100644 --- a/app/jobs/process_tag_alias_job.rb +++ b/app/jobs/process_tag_alias_job.rb @@ -1,6 +1,5 @@ class ProcessTagAliasJob < ApplicationJob - queue_as :default - queue_with_priority 20 + queue_as :bulk_update def perform(tag_alias, update_topic: true) tag_alias.process!(update_topic: update_topic) diff --git a/app/jobs/process_tag_implication_job.rb b/app/jobs/process_tag_implication_job.rb index f221b699b..330f7acf3 100644 --- a/app/jobs/process_tag_implication_job.rb +++ b/app/jobs/process_tag_implication_job.rb @@ -1,6 +1,5 @@ class ProcessTagImplicationJob < ApplicationJob - queue_as :default - queue_with_priority 20 + queue_as :bulk_update def perform(tag_implication, update_topic: true) tag_implication.process!(update_topic: update_topic) diff --git a/app/jobs/tag_batch_change_job.rb b/app/jobs/tag_batch_change_job.rb index 621e5ac60..3fed044e8 100644 --- a/app/jobs/tag_batch_change_job.rb +++ b/app/jobs/tag_batch_change_job.rb @@ -1,7 +1,7 @@ class TagBatchChangeJob < ApplicationJob class Error < Exception; end - queue_as :default + queue_as :bulk_update def perform(antecedent, consequent, updater, updater_ip_addr) raise Error.new("antecedent is missing") if antecedent.blank? diff --git a/lib/capistrano/tasks/delayed_job.rake b/lib/capistrano/tasks/delayed_job.rake index d5051b721..d21902aa7 100644 --- a/lib/capistrano/tasks/delayed_job.rake +++ b/lib/capistrano/tasks/delayed_job.rake @@ -4,6 +4,8 @@ namespace :delayed_job do on roles(:worker) do within current_path do with rails_env: fetch(:rails_env) do + execute :"systemd-run", "--user --collect --slice delayed_job --unit delayed_job.bulk_update -E RAILS_ENV=$RAILS_ENV -p WorkingDirectory=$PWD -p Restart=always #{bundle} exec script/delayed_job --queues=bulk_update run" + fetch(:delayed_job_workers, 16).times do |n| bundle = SSHKit.config.command_map[:bundle] execute :"systemd-run", "--user --collect --slice delayed_job --unit delayed_job.#{n} -E RAILS_ENV=$RAILS_ENV -p WorkingDirectory=$PWD -p Restart=always #{bundle} exec script/delayed_job --queues=default run"