169 lines
4.5 KiB
Ruby
169 lines
4.5 KiB
Ruby
class Job < ActiveRecord::Base
|
|
CATEGORIES = %w(mass_tag_edit approve_tag_alias approve_tag_implication calculate_tag_subscriptions calculate_related_tags s3_backup upload_processing)
|
|
STATUSES = %w(pending processing finished error)
|
|
|
|
validates_inclusion_of :category, :in => CATEGORIES
|
|
validates_inclusion_of :status, :in => STATUSES
|
|
|
|
def data
|
|
JSON.parse(data_as_json)
|
|
end
|
|
|
|
def data=(text)
|
|
self.data_as_json = text.to_json
|
|
end
|
|
|
|
def execute!
|
|
if repeat_count > 0
|
|
count = repeat_count - 1
|
|
else
|
|
count = repeat_count
|
|
end
|
|
|
|
begin
|
|
execute_sql("SET statement_timeout = 0")
|
|
update_attribute(:status, "processing")
|
|
__send__("execute_#{task_type}")
|
|
|
|
if count == 0
|
|
update_attribute(:status, "finished")
|
|
else
|
|
update_attributes(:status => "pending", :repeat_count => count)
|
|
end
|
|
rescue SystemExit => x
|
|
update_attribute(:status, "pending")
|
|
rescue Exception => x
|
|
update_attributes(:status => "error", :status_message => "#{x.class}: #{x}")
|
|
end
|
|
end
|
|
|
|
def execute_upload_processing
|
|
Upload.where("status = ?", "pending").each do |upload|
|
|
upload.process!
|
|
end
|
|
end
|
|
|
|
def execute_mass_tag_edit
|
|
start_tags = data["start_tags"]
|
|
result_tags = data["result_tags"]
|
|
updater_id = data["updater_id"]
|
|
updater_ip_addr = data["updater_ip_addr"]
|
|
Tag.mass_edit(start_tags, result_tags, updater_id, updater_ip_addr)
|
|
end
|
|
|
|
def execute_approve_tag_alias
|
|
ta = TagAlias.find(data["id"])
|
|
updater_id = data["updater_id"]
|
|
updater_ip_addr = data["updater_ip_addr"]
|
|
ta.approve(updater_id, updater_ip_addr)
|
|
end
|
|
|
|
def execute_approve_tag_implication
|
|
ti = TagImplication.find(data["id"])
|
|
updater_id = data["updater_id"]
|
|
updater_ip_addr = data["updater_ip_addr"]
|
|
ti.approve(updater_id, updater_ip_addr)
|
|
end
|
|
|
|
def execute_calculate_tag_subscriptions
|
|
last_run = Time.parse(data["last_run"])
|
|
if last_run.nil? || last_run < 20.minutes.ago
|
|
TagSubscription.process_all
|
|
update_attributes(:data => {:last_run => Time.now.strftime("%Y-%m-%d %H:%M")})
|
|
end
|
|
end
|
|
|
|
def execute_calculate_related_tags
|
|
tag_id = data["id"].to_i
|
|
tag = Tag.find_by_id(tag_id)
|
|
if tag
|
|
tag.commit_related(Tag.calculate_related(tag.name))
|
|
end
|
|
end
|
|
|
|
def execute_s3_backup
|
|
last_id = data["last_id"].to_i
|
|
|
|
begin
|
|
Post.where("id > ?", last_id).each do |post|
|
|
AWS::S3::Base.establish_connection!(
|
|
:access_key_id => Danbooru.config.amazon_s3_access_key_id,
|
|
:secret_access_key => Danbooru.config.amazon_s3_secret_access_key
|
|
)
|
|
if File.exists?(post.file_path)
|
|
AWS::S3::S3Object.store(
|
|
post.file_name,
|
|
open(post.file_path, "rb"),
|
|
Danbooru.config.amazon_s3_bucket_name,
|
|
"Content-MD5" => Base64.encode64(post.md5)
|
|
)
|
|
end
|
|
if post.image? && File.exists?(post.preview_path)
|
|
AWS::S3::S3Object.store(
|
|
"preview/#{post.md5}.jpg",
|
|
open(post.preview_path, "rb"),
|
|
Danbooru.config.amazon_s3_bucket_name
|
|
)
|
|
end
|
|
|
|
update_attributes(:data => {:last_id => post.id})
|
|
end
|
|
|
|
rescue Exception => x
|
|
# probably some network error, retry next time
|
|
end
|
|
end
|
|
|
|
def pretty_data
|
|
begin
|
|
case task_type
|
|
when "mass_tag_edit"
|
|
start = data["start_tags"]
|
|
result = data["result_tags"]
|
|
user = User.id_to_name(data["updater_id"])
|
|
"start:#{start} result:#{result} user:#{user}"
|
|
|
|
when "approve_tag_alias"
|
|
ta = TagAlias.find(data["id"])
|
|
"start:#{ta.name} result:#{ta.alias_name}"
|
|
|
|
when "approve_tag_implication"
|
|
ti = TagImplication.find(data["id"])
|
|
"start:#{ti.predicate.name} result:#{ti.consequent.name}"
|
|
|
|
when "calculate_tag_subscriptions"
|
|
last_run = data["last_run"]
|
|
"last run:#{last_run}"
|
|
|
|
when "calculate_related_tags"
|
|
tag = Tag.find_by_id(data["id"])
|
|
if tag
|
|
"tag:#{tag.name}"
|
|
else
|
|
"tag:UNKNOWN"
|
|
end
|
|
|
|
when "bandwidth_throttle"
|
|
""
|
|
|
|
when "s3_backup"
|
|
"last_id:" + data["last_id"].to_s
|
|
|
|
end
|
|
rescue Exception
|
|
"ERROR"
|
|
end
|
|
end
|
|
|
|
def self.pending_count(task_type)
|
|
where("task_type = ? and status = 'pending'", task_type).count
|
|
end
|
|
|
|
def self.execute_once
|
|
where("status = ?", "pending").each do |task|
|
|
task.execute!
|
|
sleep 1
|
|
end
|
|
end
|
|
end
|