Skip to content

Commit

Permalink
Extract memory lock and use it
Browse files Browse the repository at this point in the history
We actually need it for the case that multiple requests on the same process are using the same DB connection.

Closes #18
  • Loading branch information
julik committed Feb 22, 2024
1 parent a368bd7 commit 65084eb
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 30 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## [1.2.0] - 2024-02-22

- Use memory locking in addition to DB locking in `ActiveRecordBackend`

## [1.1.0] - 2024-02-22

- Use modern ActiveRecord migration options for better Rails 7.x compatibility
Expand Down
2 changes: 2 additions & 0 deletions lib/idempo.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@
require "measurometer"
require "msgpack"
require "zlib"
require "set"

require_relative "idempo/active_record_backend"
require_relative "idempo/concurrent_request_error_app"
require_relative "idempo/malformed_key_error_app"
require_relative "idempo/memory_backend"
require_relative "idempo/redis_backend"
require_relative "idempo/request_fingerprint"
require_relative "idempo/memory_lock"
require_relative "idempo/version"

class Idempo
Expand Down
29 changes: 20 additions & 9 deletions lib/idempo/active_record_backend.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def derive_lock_name(from_str)

def initialize
require "active_record"
@memory_lock = Idempo::MemoryLock.new
end

# Allows the model to be defined lazily without having to require active_record when this module gets loaded
Expand All @@ -70,15 +71,25 @@ def model
end

def with_idempotency_key(request_key)
db_safe_key = Digest::SHA1.base64digest(request_key)
lock = lock_implementation_for_connection(model.connection)

raise Idempo::ConcurrentRequest unless lock.acquire(model.connection, request_key)

begin
yield(Store.new(db_safe_key, model))
ensure
lock.release(model.connection, request_key)
# We need to use an in-memory lock because database advisory locks are
# reentrant. Both Postgres and MySQL allow multiple acquisitions of the
# same advisory lock within the same connection - in most Rails/Rack apps
# this translates to "within the same thread". This means that if one
# elects to use a non-threading webserver (like Falcon), or tests Idempo
# within the same thread (like we do), they won't get advisory locking
# for concurrent requests. Therefore a staged lock is required. First we apply
# the memory lock (for same thread on this process/multiple threads on this
# process) and then once we have that - the DB lock.
@memory_lock.with(request_key) do
db_safe_key = Digest::SHA1.base64digest(request_key)
database_lock = lock_implementation_for_connection(model.connection)
raise Idempo::ConcurrentRequest unless database_lock.acquire(model.connection, request_key)

begin
yield(Store.new(db_safe_key, model))
ensure
database_lock.release(model.connection, request_key)
end
end
end

Expand Down
24 changes: 4 additions & 20 deletions lib/idempo/memory_backend.rb
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
class Idempo::MemoryBackend
def initialize
require "set"
require_relative "response_store"

@requests_in_flight_mutex = Mutex.new
@in_progress = Set.new
@store_mutex = Mutex.new
@lock = Idempo::MemoryLock.new
@response_store = Idempo::ResponseStore.new
@store_mutex = Mutex.new
end

class Store < Struct.new(:store_mutex, :response_store, :key, keyword_init: true)
Expand All @@ -24,22 +21,9 @@ def store(data:, ttl:)
end

def with_idempotency_key(request_key)
did_insert = @requests_in_flight_mutex.synchronize do
if @in_progress.include?(request_key)
false
else
@in_progress << request_key
true
end
end

raise Idempo::ConcurrentRequest unless did_insert

store = Store.new(store_mutex: @store_mutex, response_store: @response_store, key: request_key)
begin
@lock.with(request_key) do
store = Store.new(store_mutex: @store_mutex, response_store: @response_store, key: request_key)
yield(store)
ensure
@requests_in_flight_mutex.synchronize { @in_progress.delete(request_key) }
end
end

Expand Down
21 changes: 21 additions & 0 deletions lib/idempo/memory_lock.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# A memory lock prevents multiple requests with the same request
# fingerprint from running concurrently
class Idempo::MemoryLock
def initialize
@requests_in_flight_mutex = Mutex.new
@in_progress = Set.new
end

def with(request_key)
@requests_in_flight_mutex.synchronize do
if @in_progress.include?(request_key)
raise Idempo::ConcurrentRequest
else
@in_progress << request_key
end
end
yield
ensure
@requests_in_flight_mutex.synchronize { @in_progress.delete(request_key) }
end
end
2 changes: 1 addition & 1 deletion lib/idempo/version.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# frozen_string_literal: true

class Idempo
VERSION = "1.1.0"
VERSION = "1.2.0"
end

0 comments on commit 65084eb

Please sign in to comment.