BURs: process bulk updates in dedicated job queue.
Add a dedicated queue for bulk update requests and process it using a single worker. This prevents bulk updates from consuming all available workers and preventing other job types from running. This also effectively serializes bulk updates so that they're processed one-at-a-time instead of in parallel. This will be slower overall but may avoid some of the issues with indeterminate update order under parallel updates.
This commit is contained in:
@@ -1,6 +1,5 @@
|
|||||||
class ProcessTagAliasJob < ApplicationJob
|
class ProcessTagAliasJob < ApplicationJob
|
||||||
queue_as :default
|
queue_as :bulk_update
|
||||||
queue_with_priority 20
|
|
||||||
|
|
||||||
def perform(tag_alias, update_topic: true)
|
def perform(tag_alias, update_topic: true)
|
||||||
tag_alias.process!(update_topic: update_topic)
|
tag_alias.process!(update_topic: update_topic)
|
||||||
|
|||||||
@@ -1,6 +1,5 @@
|
|||||||
class ProcessTagImplicationJob < ApplicationJob
|
class ProcessTagImplicationJob < ApplicationJob
|
||||||
queue_as :default
|
queue_as :bulk_update
|
||||||
queue_with_priority 20
|
|
||||||
|
|
||||||
def perform(tag_implication, update_topic: true)
|
def perform(tag_implication, update_topic: true)
|
||||||
tag_implication.process!(update_topic: update_topic)
|
tag_implication.process!(update_topic: update_topic)
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
class TagBatchChangeJob < ApplicationJob
|
class TagBatchChangeJob < ApplicationJob
|
||||||
class Error < Exception; end
|
class Error < Exception; end
|
||||||
|
|
||||||
queue_as :default
|
queue_as :bulk_update
|
||||||
|
|
||||||
def perform(antecedent, consequent, updater, updater_ip_addr)
|
def perform(antecedent, consequent, updater, updater_ip_addr)
|
||||||
raise Error.new("antecedent is missing") if antecedent.blank?
|
raise Error.new("antecedent is missing") if antecedent.blank?
|
||||||
|
|||||||
@@ -4,6 +4,8 @@ namespace :delayed_job do
|
|||||||
on roles(:worker) do
|
on roles(:worker) do
|
||||||
within current_path do
|
within current_path do
|
||||||
with rails_env: fetch(:rails_env) do
|
with rails_env: fetch(:rails_env) do
|
||||||
|
execute :"systemd-run", "--user --collect --slice delayed_job --unit delayed_job.bulk_update -E RAILS_ENV=$RAILS_ENV -p WorkingDirectory=$PWD -p Restart=always #{bundle} exec script/delayed_job --queues=bulk_update run"
|
||||||
|
|
||||||
fetch(:delayed_job_workers, 16).times do |n|
|
fetch(:delayed_job_workers, 16).times do |n|
|
||||||
bundle = SSHKit.config.command_map[:bundle]
|
bundle = SSHKit.config.command_map[:bundle]
|
||||||
execute :"systemd-run", "--user --collect --slice delayed_job --unit delayed_job.#{n} -E RAILS_ENV=$RAILS_ENV -p WorkingDirectory=$PWD -p Restart=always #{bundle} exec script/delayed_job --queues=default run"
|
execute :"systemd-run", "--user --collect --slice delayed_job --unit delayed_job.#{n} -E RAILS_ENV=$RAILS_ENV -p WorkingDirectory=$PWD -p Restart=always #{bundle} exec script/delayed_job --queues=default run"
|
||||||
|
|||||||
Reference in New Issue
Block a user