rate limits: rework rate limit implementation.
Rework the rate limit implementation to make it more flexible: * Allow setting different rate limits for different actions. Before we had a single rate limit for all write actions. Now different controller endpoints can have different limits. * Allow actions to be rate limited by user ID, by IP address, or both. Before actions were only limited by user ID, which meant non-logged-in actions like creating new accounts or attempting to login couldn't be rate limited. Also, because actions were limited by user ID only, you could use multiple accounts with the same IP to get around limits. Other changes: * Remove the API Limit field from user profile pages. * Remove the `remaining_api_limit` field from the `/profile.json` endpoint. * Rename the `X-Api-Limit` header to `X-Rate-Limit` and change it from a number to a JSON object containing all the rate limit info (including the refill rate, the burst factor, the cost of the call, and the current limits). * Fix a potential race condition where, if you flooded requests fast enough, you could exceed the rate limit. This was because we checked and updated the rate limit in two separate steps, which was racy; simultaneous requests could pass the check before the update happened. The new code uses some tricky SQL to check and update multiple limits in a single statement.
This commit is contained in:
@@ -2,8 +2,6 @@ class ApplicationController < ActionController::Base
|
||||
include Pundit
|
||||
helper_method :search_params
|
||||
|
||||
class ApiLimitError < StandardError; end
|
||||
|
||||
self.responder = ApplicationResponder
|
||||
|
||||
skip_forgery_protection if: -> { SessionLoader.new(request).has_api_authentication? }
|
||||
@@ -74,17 +72,16 @@ class ApplicationController < ActionController::Base
|
||||
def api_check
|
||||
return if CurrentUser.is_anonymous? || request.get? || request.head?
|
||||
|
||||
if CurrentUser.user.token_bucket.nil?
|
||||
TokenBucket.create_default(CurrentUser.user)
|
||||
CurrentUser.user.reload
|
||||
end
|
||||
rate_limiter = RateLimiter.new(
|
||||
"write",
|
||||
[CurrentUser.user.cache_key],
|
||||
cost: 1,
|
||||
rate: CurrentUser.user.api_regen_multiplier,
|
||||
burst: CurrentUser.user.api_burst_limit
|
||||
)
|
||||
|
||||
throttled = CurrentUser.user.token_bucket.throttled?
|
||||
headers["X-Api-Limit"] = CurrentUser.user.token_bucket.token_count.to_s
|
||||
|
||||
if throttled
|
||||
raise ApiLimitError, "too many requests"
|
||||
end
|
||||
headers["X-Rate-Limit"] = rate_limiter.to_json
|
||||
rate_limiter.limit!
|
||||
end
|
||||
|
||||
def rescue_exception(exception)
|
||||
@@ -113,7 +110,7 @@ class ApplicationController < ActionController::Base
|
||||
render_error_page(410, exception, template: "static/pagination_error", message: "You cannot go beyond page #{CurrentUser.user.page_limit}.")
|
||||
when Post::SearchError
|
||||
render_error_page(422, exception, template: "static/tag_limit_error", message: "You cannot search for more than #{CurrentUser.tag_query_limit} tags at a time.")
|
||||
when ApiLimitError
|
||||
when RateLimiter::RateLimitError
|
||||
render_error_page(429, exception)
|
||||
when NotImplementedError
|
||||
render_error_page(501, exception, message: "This feature isn't available: #{exception.message}")
|
||||
|
||||
@@ -5,13 +5,13 @@ module DanbooruMaintenance
|
||||
safely { Upload.prune! }
|
||||
safely { PostPruner.prune! }
|
||||
safely { PostAppealForumUpdater.update_forum! }
|
||||
safely { RateLimit.prune! }
|
||||
safely { regenerate_post_counts! }
|
||||
end
|
||||
|
||||
def daily
|
||||
safely { Delayed::Job.where('created_at < ?', 45.days.ago).delete_all }
|
||||
safely { PostDisapproval.prune! }
|
||||
safely { TokenBucket.prune! }
|
||||
safely { BulkUpdateRequestPruner.warn_old }
|
||||
safely { BulkUpdateRequestPruner.reject_expired }
|
||||
safely { Ban.prune! }
|
||||
|
||||
30
app/logical/rate_limiter.rb
Normal file
30
app/logical/rate_limiter.rb
Normal file
@@ -0,0 +1,30 @@
|
||||
class RateLimiter
|
||||
class RateLimitError < StandardError; end
|
||||
|
||||
attr_reader :action, :keys, :cost, :rate, :burst
|
||||
|
||||
def initialize(action, keys = ["*"], cost: 1, rate: 1, burst: 1)
|
||||
@action = action
|
||||
@keys = keys
|
||||
@cost = cost
|
||||
@rate = rate
|
||||
@burst = burst
|
||||
end
|
||||
|
||||
def limit!
|
||||
raise RateLimitError if limited?
|
||||
end
|
||||
|
||||
def limited?
|
||||
rate_limits.any?(&:limited?)
|
||||
end
|
||||
|
||||
def as_json(options = {})
|
||||
hash = rate_limits.map { |limit| [limit.key, limit.points] }.to_h
|
||||
super(options).except("keys", "rate_limits").merge(limits: hash)
|
||||
end
|
||||
|
||||
def rate_limits
|
||||
@rate_limits ||= RateLimit.create_or_update!(action: action, keys: keys, cost: cost, rate: rate, burst: burst)
|
||||
end
|
||||
end
|
||||
53
app/models/rate_limit.rb
Normal file
53
app/models/rate_limit.rb
Normal file
@@ -0,0 +1,53 @@
|
||||
class RateLimit < ApplicationRecord
|
||||
scope :expired, -> { where("updated_at < ?", 1.hour.ago) }
|
||||
|
||||
def self.prune!
|
||||
expired.delete_all
|
||||
end
|
||||
|
||||
# `action` is the action being limited. Usually a controller endpoint.
|
||||
# `keys` is who is being limited. Usually a [user, ip] pair, meaning the action is limited both by the user's ID and their IP.
|
||||
# `cost` is the number of points the action costs.
|
||||
# `rate` is the number of points per second that are refilled.
|
||||
# `burst` is the maximum number of points that can be saved up.
|
||||
def self.create_or_update!(action:, keys:, cost:, rate:, burst:)
|
||||
# { key0: keys[0], ..., keyN: keys[N] }
|
||||
key_params = keys.map.with_index { |key, i| [:"key#{i}", key] }.to_h
|
||||
|
||||
# (created_at, updated_at, action, keyN, points)
|
||||
values = keys.map.with_index { |key, i| "(:now, :now, :action, :key#{i}, :points)" }
|
||||
|
||||
# Do an upsert, creating a new rate limit object for each key that doesn't
|
||||
# already exist, and updating the limit for each limit that already exists.
|
||||
#
|
||||
# https://www.postgresql.org/docs/current/sql-insert.html#SQL-ON-CONFLICT
|
||||
sql = <<~SQL
|
||||
INSERT INTO rate_limits (created_at, updated_at, action, key, points)
|
||||
VALUES #{values.join(", ")}
|
||||
ON CONFLICT (action, key) DO UPDATE SET
|
||||
updated_at = :now,
|
||||
limited = rate_limits.points + :rate * EXTRACT(epoch FROM (:now - rate_limits.updated_at)) < 0,
|
||||
points =
|
||||
CASE
|
||||
WHEN rate_limits.points + :rate * EXTRACT(epoch FROM (:now - rate_limits.updated_at)) < 0 THEN
|
||||
LEAST(:burst, rate_limits.points + :rate * EXTRACT(epoch FROM (:now - rate_limits.updated_at)))
|
||||
ELSE
|
||||
LEAST(:burst, rate_limits.points + :rate * EXTRACT(epoch FROM (:now - rate_limits.updated_at))) - :cost
|
||||
END
|
||||
RETURNING *
|
||||
SQL
|
||||
|
||||
sql_params = {
|
||||
now: Time.zone.now,
|
||||
action: action,
|
||||
rate: rate,
|
||||
burst: burst,
|
||||
cost: cost,
|
||||
points: burst - cost,
|
||||
**key_params
|
||||
}
|
||||
|
||||
rate_limits = RateLimit.find_by_sql([sql, sql_params])
|
||||
rate_limits
|
||||
end
|
||||
end
|
||||
@@ -1,41 +0,0 @@
|
||||
class TokenBucket < ApplicationRecord
|
||||
self.primary_key = "user_id"
|
||||
belongs_to :user
|
||||
|
||||
def self.prune!
|
||||
where("last_touched_at < ?", 1.day.ago).delete_all
|
||||
end
|
||||
|
||||
def self.create_default(user)
|
||||
TokenBucket.create(user_id: user.id, token_count: user.api_burst_limit, last_touched_at: Time.now)
|
||||
end
|
||||
|
||||
def accept?
|
||||
token_count >= 1
|
||||
end
|
||||
|
||||
def add!
|
||||
now = Time.now
|
||||
TokenBucket.where(user_id: user_id).update_all(["token_count = least(token_count + (? * extract(epoch from ? - last_touched_at)), ?), last_touched_at = ?", user.api_regen_multiplier, now, user.api_burst_limit, now])
|
||||
|
||||
# estimate the token count to avoid reloading
|
||||
self.token_count += user.api_regen_multiplier * (now - last_touched_at)
|
||||
self.token_count = user.api_burst_limit if token_count > user.api_burst_limit
|
||||
end
|
||||
|
||||
def consume!
|
||||
TokenBucket.where(user_id: user_id).update_all("token_count = greatest(0, token_count - 1)")
|
||||
self.token_count -= 1
|
||||
end
|
||||
|
||||
def throttled?
|
||||
add!
|
||||
|
||||
if accept?
|
||||
consume!
|
||||
return false
|
||||
else
|
||||
return true
|
||||
end
|
||||
end
|
||||
end
|
||||
@@ -134,7 +134,6 @@ class User < ApplicationRecord
|
||||
has_many :user_events, dependent: :destroy
|
||||
has_one :recent_ban, -> {order("bans.id desc")}, :class_name => "Ban"
|
||||
|
||||
has_one :token_bucket
|
||||
has_one :email_address, dependent: :destroy
|
||||
has_many :api_keys, dependent: :destroy
|
||||
has_many :note_versions, :foreign_key => "updater_id"
|
||||
@@ -539,10 +538,6 @@ class User < ApplicationRecord
|
||||
User.api_burst_limit(level)
|
||||
end
|
||||
|
||||
def remaining_api_limit
|
||||
token_bucket.try(:token_count) || api_burst_limit
|
||||
end
|
||||
|
||||
def statement_timeout
|
||||
User.statement_timeout(level)
|
||||
end
|
||||
|
||||
@@ -62,7 +62,7 @@ class UserPolicy < ApplicationPolicy
|
||||
comment_threshold default_image_size
|
||||
favorite_tags blacklisted_tags time_zone per_page
|
||||
custom_style favorite_count api_regen_multiplier
|
||||
api_burst_limit remaining_api_limit statement_timeout
|
||||
api_burst_limit statement_timeout
|
||||
favorite_group_limit favorite_limit tag_query_limit
|
||||
max_saved_searches theme
|
||||
]
|
||||
|
||||
@@ -265,14 +265,6 @@
|
||||
(<%= link_to_wiki "help", "help:api" %>)
|
||||
</td>
|
||||
</tr>
|
||||
|
||||
<tr>
|
||||
<th>API Limits</th>
|
||||
<td>
|
||||
<%= CurrentUser.user.remaining_api_limit %>
|
||||
/ <%= CurrentUser.user.api_burst_limit %> <span class="fineprint">(may not be up to date)</span>
|
||||
</td>
|
||||
</tr>
|
||||
<% end %>
|
||||
</tbody>
|
||||
</table>
|
||||
|
||||
@@ -5,6 +5,6 @@ class CreateTokenBuckets < ActiveRecord::Migration[4.2]
|
||||
end
|
||||
|
||||
def down
|
||||
raise NotImplementedError
|
||||
drop_table :token_buckets
|
||||
end
|
||||
end
|
||||
|
||||
@@ -0,0 +1,24 @@
|
||||
require_relative "20170106012138_create_token_buckets"
|
||||
|
||||
class ReplaceTokenBucketsWithRateLimits < ActiveRecord::Migration[6.1]
|
||||
def change
|
||||
revert CreateTokenBuckets
|
||||
|
||||
create_table :rate_limits do |t|
|
||||
t.timestamps null: false
|
||||
t.boolean :limited, null: false, default: false
|
||||
t.float :points, null: false
|
||||
t.string :action, null: false
|
||||
t.string :key, null: false
|
||||
|
||||
t.index [:key, :action], unique: true
|
||||
end
|
||||
|
||||
reversible do |dir|
|
||||
dir.up do
|
||||
execute "ALTER TABLE rate_limits SET UNLOGGED"
|
||||
execute "ALTER TABLE rate_limits SET (fillfactor = 50)"
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
@@ -2929,6 +2929,41 @@ CREATE SEQUENCE public.posts_id_seq
|
||||
ALTER SEQUENCE public.posts_id_seq OWNED BY public.posts.id;
|
||||
|
||||
|
||||
--
|
||||
-- Name: rate_limits; Type: TABLE; Schema: public; Owner: -
|
||||
--
|
||||
|
||||
CREATE UNLOGGED TABLE public.rate_limits (
|
||||
id bigint NOT NULL,
|
||||
created_at timestamp(6) without time zone NOT NULL,
|
||||
updated_at timestamp(6) without time zone NOT NULL,
|
||||
limited boolean DEFAULT false NOT NULL,
|
||||
points double precision NOT NULL,
|
||||
action character varying NOT NULL,
|
||||
key character varying NOT NULL
|
||||
)
|
||||
WITH (fillfactor='50');
|
||||
|
||||
|
||||
--
|
||||
-- Name: rate_limits_id_seq; Type: SEQUENCE; Schema: public; Owner: -
|
||||
--
|
||||
|
||||
CREATE SEQUENCE public.rate_limits_id_seq
|
||||
START WITH 1
|
||||
INCREMENT BY 1
|
||||
NO MINVALUE
|
||||
NO MAXVALUE
|
||||
CACHE 1;
|
||||
|
||||
|
||||
--
|
||||
-- Name: rate_limits_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: -
|
||||
--
|
||||
|
||||
ALTER SEQUENCE public.rate_limits_id_seq OWNED BY public.rate_limits.id;
|
||||
|
||||
|
||||
--
|
||||
-- Name: saved_searches; Type: TABLE; Schema: public; Owner: -
|
||||
--
|
||||
@@ -3085,17 +3120,6 @@ CREATE SEQUENCE public.tags_id_seq
|
||||
ALTER SEQUENCE public.tags_id_seq OWNED BY public.tags.id;
|
||||
|
||||
|
||||
--
|
||||
-- Name: token_buckets; Type: TABLE; Schema: public; Owner: -
|
||||
--
|
||||
|
||||
CREATE UNLOGGED TABLE public.token_buckets (
|
||||
user_id integer,
|
||||
last_touched_at timestamp without time zone NOT NULL,
|
||||
token_count real NOT NULL
|
||||
);
|
||||
|
||||
|
||||
--
|
||||
-- Name: uploads; Type: TABLE; Schema: public; Owner: -
|
||||
--
|
||||
@@ -4352,6 +4376,13 @@ ALTER TABLE ONLY public.post_votes ALTER COLUMN id SET DEFAULT nextval('public.p
|
||||
ALTER TABLE ONLY public.posts ALTER COLUMN id SET DEFAULT nextval('public.posts_id_seq'::regclass);
|
||||
|
||||
|
||||
--
|
||||
-- Name: rate_limits id; Type: DEFAULT; Schema: public; Owner: -
|
||||
--
|
||||
|
||||
ALTER TABLE ONLY public.rate_limits ALTER COLUMN id SET DEFAULT nextval('public.rate_limits_id_seq'::regclass);
|
||||
|
||||
|
||||
--
|
||||
-- Name: saved_searches id; Type: DEFAULT; Schema: public; Owner: -
|
||||
--
|
||||
@@ -4739,6 +4770,14 @@ ALTER TABLE ONLY public.posts
|
||||
ADD CONSTRAINT posts_pkey PRIMARY KEY (id);
|
||||
|
||||
|
||||
--
|
||||
-- Name: rate_limits rate_limits_pkey; Type: CONSTRAINT; Schema: public; Owner: -
|
||||
--
|
||||
|
||||
ALTER TABLE ONLY public.rate_limits
|
||||
ADD CONSTRAINT rate_limits_pkey PRIMARY KEY (id);
|
||||
|
||||
|
||||
--
|
||||
-- Name: saved_searches saved_searches_pkey; Type: CONSTRAINT; Schema: public; Owner: -
|
||||
--
|
||||
@@ -7280,6 +7319,13 @@ CREATE INDEX index_posts_on_uploader_id ON public.posts USING btree (uploader_id
|
||||
CREATE INDEX index_posts_on_uploader_ip_addr ON public.posts USING btree (uploader_ip_addr);
|
||||
|
||||
|
||||
--
|
||||
-- Name: index_rate_limits_on_key_and_action; Type: INDEX; Schema: public; Owner: -
|
||||
--
|
||||
|
||||
CREATE UNIQUE INDEX index_rate_limits_on_key_and_action ON public.rate_limits USING btree (key, action);
|
||||
|
||||
|
||||
--
|
||||
-- Name: index_saved_searches_on_labels; Type: INDEX; Schema: public; Owner: -
|
||||
--
|
||||
@@ -7385,13 +7431,6 @@ CREATE INDEX index_tags_on_name_trgm ON public.tags USING gin (name public.gin_t
|
||||
CREATE INDEX index_tags_on_post_count ON public.tags USING btree (post_count);
|
||||
|
||||
|
||||
--
|
||||
-- Name: index_token_buckets_on_user_id; Type: INDEX; Schema: public; Owner: -
|
||||
--
|
||||
|
||||
CREATE UNIQUE INDEX index_token_buckets_on_user_id ON public.token_buckets USING btree (user_id);
|
||||
|
||||
|
||||
--
|
||||
-- Name: index_uploads_on_referer_url; Type: INDEX; Schema: public; Owner: -
|
||||
--
|
||||
@@ -7964,6 +8003,7 @@ INSERT INTO "schema_migrations" (version) VALUES
|
||||
('20210127000201'),
|
||||
('20210127012303'),
|
||||
('20210214095121'),
|
||||
('20210214101614');
|
||||
('20210214101614'),
|
||||
('20210303195217');
|
||||
|
||||
|
||||
|
||||
8
test/factories/rate_limit.rb
Normal file
8
test/factories/rate_limit.rb
Normal file
@@ -0,0 +1,8 @@
|
||||
FactoryBot.define do
|
||||
factory(:rate_limit) do
|
||||
limited { false }
|
||||
points { 0 }
|
||||
action { "test" }
|
||||
key { "1234" }
|
||||
end
|
||||
end
|
||||
@@ -227,7 +227,7 @@ class ApplicationControllerTest < ActionDispatch::IntegrationTest
|
||||
should "fail with a 429 error" do
|
||||
user = create(:user)
|
||||
post = create(:post, rating: "s")
|
||||
TokenBucket.any_instance.stubs(:throttled?).returns(true)
|
||||
RateLimit.any_instance.stubs(:limited?).returns(true)
|
||||
|
||||
put_auth post_path(post), user, params: { post: { rating: "e" } }
|
||||
|
||||
|
||||
72
test/unit/rate_limit_test.rb
Normal file
72
test/unit/rate_limit_test.rb
Normal file
@@ -0,0 +1,72 @@
|
||||
require 'test_helper'
|
||||
|
||||
class RateLimitTest < ActiveSupport::TestCase
|
||||
context "RateLimit: " do
|
||||
context "#limit! method" do
|
||||
should "create a new rate limit object if none exists, or update it if it already exists" do
|
||||
assert_difference("RateLimit.count", 1) do
|
||||
RateLimiter.new("write", ["users/1"]).limited?
|
||||
end
|
||||
|
||||
assert_difference("RateLimit.count", 0) do
|
||||
RateLimiter.new("write", ["users/1"]).limited?
|
||||
end
|
||||
|
||||
assert_difference("RateLimit.count", 1) do
|
||||
RateLimiter.new("write", ["users/1", "ip/1.2.3.4"]).limited?
|
||||
end
|
||||
|
||||
assert_difference("RateLimit.count", 0) do
|
||||
RateLimiter.new("write", ["users/1", "ip/1.2.3.4"]).limited?
|
||||
end
|
||||
end
|
||||
|
||||
should "include the cost of the first action when initializing the limit" do
|
||||
limiter = RateLimiter.new("write", ["users/1"], burst: 10, cost: 1)
|
||||
assert_equal(9, limiter.rate_limits.first.points)
|
||||
end
|
||||
|
||||
should "be limited if the point count is negative" do
|
||||
freeze_time
|
||||
create(:rate_limit, action: "write", key: "users/1", points: -1)
|
||||
limiter = RateLimiter.new("write", ["users/1"], cost: 1)
|
||||
|
||||
assert_equal(true, limiter.limited?)
|
||||
assert_equal(-1, limiter.rate_limits.first.points)
|
||||
end
|
||||
|
||||
should "not be limited if the point count was positive before the action" do
|
||||
freeze_time
|
||||
create(:rate_limit, action: "write", key: "users/1", points: 0.01)
|
||||
limiter = RateLimiter.new("write", ["users/1"], cost: 1)
|
||||
|
||||
assert_equal(false, limiter.limited?)
|
||||
assert_equal(-0.99, limiter.rate_limits.first.points)
|
||||
end
|
||||
|
||||
should "refill the points at the correct rate" do
|
||||
freeze_time
|
||||
create(:rate_limit, action: "write", key: "users/1", points: -2)
|
||||
|
||||
limiter = RateLimiter.new("write", ["users/1"], cost: 1, rate: 1, burst: 10)
|
||||
assert_equal(true, limiter.limited?)
|
||||
assert_equal(-2, limiter.rate_limits.first.points)
|
||||
|
||||
travel 1.second
|
||||
limiter = RateLimiter.new("write", ["users/1"], cost: 1, rate: 1, burst: 10)
|
||||
assert_equal(true, limiter.limited?)
|
||||
assert_equal(-1, limiter.rate_limits.first.points)
|
||||
|
||||
travel 5.second
|
||||
limiter = RateLimiter.new("write", ["users/1"], cost: 1, rate: 1, burst: 10)
|
||||
assert_equal(false, limiter.limited?)
|
||||
assert_equal(3, limiter.rate_limits.first.points)
|
||||
|
||||
travel 60.second
|
||||
limiter = RateLimiter.new("write", ["users/1"], cost: 1, rate: 1, burst: 10)
|
||||
assert_equal(false, limiter.limited?)
|
||||
assert_equal(9, limiter.rate_limits.first.points)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
@@ -1,45 +0,0 @@
|
||||
require 'test_helper'
|
||||
|
||||
class TokenBucketTest < ActiveSupport::TestCase
|
||||
context "#add!" do
|
||||
setup do
|
||||
@user = FactoryBot.create(:user)
|
||||
TokenBucket.create(user_id: @user.id, last_touched_at: 1.minute.ago, token_count: 0)
|
||||
end
|
||||
|
||||
should "work" do
|
||||
@user.token_bucket.add!
|
||||
assert_operator(@user.token_bucket.token_count, :>, 0)
|
||||
@user.reload
|
||||
assert_operator(@user.token_bucket.token_count, :>, 0)
|
||||
end
|
||||
end
|
||||
|
||||
context "#consume!" do
|
||||
setup do
|
||||
@user = FactoryBot.create(:user)
|
||||
TokenBucket.create(user_id: @user.id, last_touched_at: 1.minute.ago, token_count: 1)
|
||||
end
|
||||
|
||||
should "work" do
|
||||
@user.token_bucket.consume!
|
||||
assert_operator(@user.token_bucket.token_count, :<, 1)
|
||||
@user.reload
|
||||
assert_operator(@user.token_bucket.token_count, :<, 1)
|
||||
end
|
||||
end
|
||||
|
||||
context "#throttled?" do
|
||||
setup do
|
||||
@user = FactoryBot.create(:user)
|
||||
TokenBucket.create(user_id: @user.id, last_touched_at: 1.minute.ago, token_count: 0)
|
||||
end
|
||||
|
||||
should "work" do
|
||||
assert(!@user.token_bucket.throttled?)
|
||||
assert_operator(@user.token_bucket.token_count, :<, 60)
|
||||
@user.reload
|
||||
assert_operator(@user.token_bucket.token_count, :<, 60)
|
||||
end
|
||||
end
|
||||
end
|
||||
Reference in New Issue
Block a user