more fault tolerance for uploads
This commit is contained in:
@@ -14,7 +14,13 @@ class CurrentUser
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def self.as(user, &block)
|
def self.as(user_or_id, &block)
|
||||||
|
if user_or_id.is_a?(String) || user_or_id.is_a?(Integer)
|
||||||
|
user = User.find(user_or_id)
|
||||||
|
else
|
||||||
|
user = user_or_id
|
||||||
|
end
|
||||||
|
|
||||||
scoped(user, &block)
|
scoped(user, &block)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|||||||
@@ -15,7 +15,7 @@ class UploadService
|
|||||||
if post.nil?
|
if post.nil?
|
||||||
# this gets called from UploadsController#new so we need
|
# this gets called from UploadsController#new so we need
|
||||||
# to preprocess async
|
# to preprocess async
|
||||||
Preprocessor.new(source: url).delay(queue: "default").start!(CurrentUser.user.id)
|
Preprocessor.new(source: url).delay(priority: -1, queue: "default").delayed_start(CurrentUser.id)
|
||||||
end
|
end
|
||||||
|
|
||||||
begin
|
begin
|
||||||
@@ -27,7 +27,7 @@ class UploadService
|
|||||||
return [upload, post, source, normalized_url, remote_size]
|
return [upload, post, source, normalized_url, remote_size]
|
||||||
elsif file
|
elsif file
|
||||||
# this gets called via XHR so we can process sync
|
# this gets called via XHR so we can process sync
|
||||||
Preprocessor.new(file: file).start!((CurrentUser.user.id))
|
Preprocessor.new(file: file).delayed_start(CurrentUser.id)
|
||||||
end
|
end
|
||||||
|
|
||||||
return [upload]
|
return [upload]
|
||||||
@@ -199,7 +199,7 @@ class UploadService
|
|||||||
# in case this upload never finishes processing, we need to delete the
|
# in case this upload never finishes processing, we need to delete the
|
||||||
# distributed files in the future
|
# distributed files in the future
|
||||||
Danbooru.config.other_server_hosts.each do |host|
|
Danbooru.config.other_server_hosts.each do |host|
|
||||||
UploadService::Utils.delay(queue: host, run_at: 30.minutes.from_now).delete_file(upload.md5, upload.file_ext, upload.id)
|
UploadService::Utils.delay(priority: -1, queue: host, run_at: 30.minutes.from_now).delete_file(upload.md5, upload.file_ext, upload.id)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
@@ -311,7 +311,16 @@ class UploadService
|
|||||||
predecessor.present?
|
predecessor.present?
|
||||||
end
|
end
|
||||||
|
|
||||||
def start!(uploader_id)
|
def delayed_start(uploader_id)
|
||||||
|
CurrentUser.as(uploader_id) do
|
||||||
|
start!
|
||||||
|
end
|
||||||
|
|
||||||
|
rescue ActiveRecord::RecordNotUnique
|
||||||
|
return
|
||||||
|
end
|
||||||
|
|
||||||
|
def start!
|
||||||
if Utils.is_downloadable?(source)
|
if Utils.is_downloadable?(source)
|
||||||
CurrentUser.as_system do
|
CurrentUser.as_system do
|
||||||
if Post.tag_match("source:#{source}").where.not(id: original_post_id).exists?
|
if Post.tag_match("source:#{source}").where.not(id: original_post_id).exists?
|
||||||
@@ -331,29 +340,27 @@ class UploadService
|
|||||||
params[:rating] ||= "q"
|
params[:rating] ||= "q"
|
||||||
params[:tag_string] ||= "tagme"
|
params[:tag_string] ||= "tagme"
|
||||||
|
|
||||||
CurrentUser.as(User.find(uploader_id)) do
|
upload = Upload.create!(params)
|
||||||
upload = Upload.create!(params)
|
begin
|
||||||
begin
|
upload.update(status: "preprocessing")
|
||||||
upload.update(status: "preprocessing")
|
|
||||||
|
|
||||||
if source.present?
|
if source.present?
|
||||||
file = Utils.download_for_upload(source, upload)
|
file = Utils.download_for_upload(source, upload)
|
||||||
elsif params[:file].present?
|
elsif params[:file].present?
|
||||||
file = params[:file]
|
file = params[:file]
|
||||||
end
|
|
||||||
|
|
||||||
Utils.process_file(upload, file)
|
|
||||||
|
|
||||||
upload.rating = params[:rating]
|
|
||||||
upload.tag_string = params[:tag_string]
|
|
||||||
upload.status = "preprocessed"
|
|
||||||
upload.save!
|
|
||||||
rescue Exception => x
|
|
||||||
upload.update(status: "error: #{x.class} - #{x.message}", backtrace: x.backtrace.join("\n"))
|
|
||||||
end
|
end
|
||||||
|
|
||||||
return upload
|
Utils.process_file(upload, file)
|
||||||
|
|
||||||
|
upload.rating = params[:rating]
|
||||||
|
upload.tag_string = params[:tag_string]
|
||||||
|
upload.status = "preprocessed"
|
||||||
|
upload.save!
|
||||||
|
rescue Exception => x
|
||||||
|
upload.update(status: "error: #{x.class} - #{x.message}", backtrace: x.backtrace.join("\n"))
|
||||||
end
|
end
|
||||||
|
|
||||||
|
return upload
|
||||||
end
|
end
|
||||||
|
|
||||||
def finish!(upload = nil)
|
def finish!(upload = nil)
|
||||||
@@ -442,7 +449,7 @@ class UploadService
|
|||||||
file: replacement.replacement_file,
|
file: replacement.replacement_file,
|
||||||
original_post_id: post.id
|
original_post_id: post.id
|
||||||
)
|
)
|
||||||
upload = preprocessor.start!(CurrentUser.id)
|
upload = preprocessor.start!
|
||||||
return if upload.is_errored?
|
return if upload.is_errored?
|
||||||
upload = preprocessor.finish!(upload)
|
upload = preprocessor.finish!(upload)
|
||||||
return if upload.is_errored?
|
return if upload.is_errored?
|
||||||
@@ -521,7 +528,7 @@ class UploadService
|
|||||||
@params = params
|
@params = params
|
||||||
end
|
end
|
||||||
|
|
||||||
def scoped_start(uploader_id)
|
def delayed_start(uploader_id)
|
||||||
CurrentUser.as(uploader_id) do
|
CurrentUser.as(uploader_id) do
|
||||||
start!
|
start!
|
||||||
end
|
end
|
||||||
@@ -533,7 +540,7 @@ class UploadService
|
|||||||
preprocessor = Preprocessor.new(params)
|
preprocessor = Preprocessor.new(params)
|
||||||
|
|
||||||
if preprocessor.in_progress?
|
if preprocessor.in_progress?
|
||||||
delay(queue: "default", run_at: 5.seconds.from_now).scoped_start(CurrentUser.id)
|
delay(queue: "default", priority: -1, run_at: 5.seconds.from_now).delayed_start(CurrentUser.id)
|
||||||
return preprocessor.predecessor
|
return preprocessor.predecessor
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|||||||
@@ -65,7 +65,7 @@ class Upload < ApplicationRecord
|
|||||||
serialize :context, JSON
|
serialize :context, JSON
|
||||||
|
|
||||||
def initialize_attributes
|
def initialize_attributes
|
||||||
self.uploader_id = CurrentUser.user.id
|
self.uploader_id = CurrentUser.id
|
||||||
self.uploader_ip_addr = CurrentUser.ip_addr
|
self.uploader_ip_addr = CurrentUser.ip_addr
|
||||||
self.server = Danbooru.config.server_host
|
self.server = Danbooru.config.server_host
|
||||||
end
|
end
|
||||||
|
|||||||
@@ -348,7 +348,7 @@ class UploadServiceTest < ActiveSupport::TestCase
|
|||||||
|
|
||||||
should "work for a jpeg" do
|
should "work for a jpeg" do
|
||||||
@service = subject.new(source: @jpeg)
|
@service = subject.new(source: @jpeg)
|
||||||
@upload = @service.start!(CurrentUser.id)
|
@upload = @service.start!
|
||||||
assert_equal("preprocessed", @upload.status)
|
assert_equal("preprocessed", @upload.status)
|
||||||
assert_not_nil(@upload.md5)
|
assert_not_nil(@upload.md5)
|
||||||
assert_equal("jpg", @upload.file_ext)
|
assert_equal("jpg", @upload.file_ext)
|
||||||
@@ -361,7 +361,7 @@ class UploadServiceTest < ActiveSupport::TestCase
|
|||||||
|
|
||||||
should "work for an ugoira" do
|
should "work for an ugoira" do
|
||||||
@service = subject.new(source: @ugoira)
|
@service = subject.new(source: @ugoira)
|
||||||
@upload = @service.start!(CurrentUser.id)
|
@upload = @service.start!
|
||||||
assert_equal("preprocessed", @upload.status)
|
assert_equal("preprocessed", @upload.status)
|
||||||
assert_not_nil(@upload.md5)
|
assert_not_nil(@upload.md5)
|
||||||
assert_equal("zip", @upload.file_ext)
|
assert_equal("zip", @upload.file_ext)
|
||||||
@@ -373,7 +373,7 @@ class UploadServiceTest < ActiveSupport::TestCase
|
|||||||
|
|
||||||
should "work for a video" do
|
should "work for a video" do
|
||||||
@service = subject.new(source: @video)
|
@service = subject.new(source: @video)
|
||||||
@upload = @service.start!(CurrentUser.id)
|
@upload = @service.start!
|
||||||
assert_equal("preprocessed", @upload.status)
|
assert_equal("preprocessed", @upload.status)
|
||||||
assert_not_nil(@upload.md5)
|
assert_not_nil(@upload.md5)
|
||||||
assert_equal("mp4", @upload.file_ext)
|
assert_equal("mp4", @upload.file_ext)
|
||||||
@@ -390,7 +390,7 @@ class UploadServiceTest < ActiveSupport::TestCase
|
|||||||
|
|
||||||
should "leave the upload in an error state" do
|
should "leave the upload in an error state" do
|
||||||
@service = subject.new(source: @video)
|
@service = subject.new(source: @video)
|
||||||
@upload = @service.start!(CurrentUser.id)
|
@upload = @service.start!
|
||||||
assert_match(/error:/, @upload.status)
|
assert_match(/error:/, @upload.status)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|||||||
Reference in New Issue
Block a user