Files
danbooru/app/logical/bigquery_export_service.rb
evazion f02b437085 bigquery: temp disable dumping the posts table.
Dumping the posts table to BigQuery tends to timeout and leave stuck
jobs in the jobs table. Disable it until it can be fixed.
2021-12-08 18:04:14 -06:00

117 lines
3.7 KiB
Ruby

# Perform a daily database dump to BigQuery and to Google Cloud Storage. This
# contains all data visible to anonymous users.
#
# The database dumps are publicly accessible. The BigQuery data is at
# `danbooru1.danbooru_public.{table}`. The Google Cloud Storage data is at
# `gs://danbooru_public/data/{table}.json`. The storage bucket contains the data
# in newline-delimited JSON format.
#
# @see DanbooruMaintenance#daily
# @see https://console.cloud.google.com/storage/browser/danbooru_public
# @see https://console.cloud.google.com/bigquery?d=danbooru_public&p=danbooru1&t=posts&page=table
# @see https://cloud.google.com/bigquery/docs
# @see https://cloud.google.com/storage/docs
# @see https://en.wikipedia.org/wiki/JSON_streaming#Line-delimited_JSON
class BigqueryExportService
extend Memoist
attr_reader :model, :dataset_name, :credentials
# Prepare to dump a table. Call {#export!} to dump it.
# @param model [ApplicationRecord] the database table to dump
# @param dataset_name [String] the BigQuery dataset name
# @param credentials [String] the Google Cloud credentials (in JSON format)
def initialize(model = nil, dataset_name: "danbooru_public", credentials: default_credentials)
@model = model
@dataset_name = dataset_name
@credentials = credentials
end
# Start a background job for each table to export it to BigQuery.
def self.async_export_all!(**options)
models.each do |model|
BigqueryExportJob.perform_later(model: model, **options)
end
end
# The list of database tables to dump.
def self.models
Rails.application.eager_load!
models = ApplicationRecord.descendants.sort_by(&:name)
models -= [Post, Favorite, IpAddress, TagRelationship, ArtistVersion, ArtistCommentaryVersion, NoteVersion, PoolVersion, PostVersion, WikiPageVersion]
models
end
def enabled?
credentials.present?
end
# Dump the table to Cloud Storage and BigQuery.
def export!
return unless enabled? && records.any?
file = dump_records!
upload_to_bigquery!(file)
end
# Dump the table's records to a gzipped, newline-delimited JSON tempfile.
def dump_records!
file = Tempfile.new("danbooru-export-dump-", binmode: true)
file = Zlib::GzipWriter.new(file)
CurrentUser.scoped(User.anonymous) do
records.find_each(batch_size: 5_000) do |record|
file.puts(record.to_json)
end
end
file.close # flush zlib footer
file
end
# Upload the JSON dump to Cloud Storage, then load it into BigQuery.
def upload_to_bigquery!(file)
table_name = model.model_name.collection
gsfilename = "data/#{table_name}.json"
gsfile = bucket.create_file(file.path, gsfilename, content_encoding: "gzip")
job = dataset.load_job(table_name, gsfile, format: "json", autodetect: true, create: "needed", write: "truncate")
job.wait_until_done!
job
end
# The list of records to dump.
def records
model.visible(User.anonymous)
end
# Find or create the BigQuery dataset.
def dataset
bigquery.dataset(dataset_name) || bigquery.create_dataset(dataset_name)
end
# Find or create the Google Storage bucket.
def bucket
storage.bucket(dataset_name) || storage.create_bucket(dataset_name, acl: "public", default_acl: "public", storage_class: "standard", location: "us-east1")
end
# The BigQuery API client.
def bigquery
Google::Cloud::Bigquery.new(credentials: credentials)
end
# The Cloud Storage API client.
def storage
Google::Cloud::Storage.new(credentials: credentials)
end
def default_credentials
return nil unless Danbooru.config.google_cloud_credentials.present?
JSON.parse(Danbooru.config.google_cloud_credentials)
end
memoize :dataset, :bucket, :bigquery, :storage, :default_credentials
end