Skip to content

Commit

Permalink
Merge pull request #2 from ganmacs/concurrency-on-single-tcp-connection
Browse files Browse the repository at this point in the history
Concurrency on single tcp connection
  • Loading branch information
ganmacs authored Jan 28, 2019
2 parents 3be4d52 + 36dd044 commit 1306f0f
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 50 deletions.
19 changes: 0 additions & 19 deletions lib/griffin/counting_semaphore.rb

This file was deleted.

6 changes: 5 additions & 1 deletion lib/griffin/engine/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ module Server
def initialize
@core = Griffin::Server.new(
pool_size: config[:pool_size],
interceptors: config[:interceptors]
min_pool_size: config[:min_pool_size],
max_pool_size: config[:max_pool_size],
min_connection_size: config[:min_connection_size],
max_connection_size: config[:max_connection_size],
interceptors: config[:interceptors],
)
end

Expand Down
5 changes: 4 additions & 1 deletion lib/griffin/engine/single.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ module Engine
class Single
def self.create(config)
serv = Griffin::Server.new(
pool_size: config[:pool_size],
min_pool_size: config[:min_pool_size],
max_pool_size: config[:max_pool_size],
min_connection_size: config[:min_connection_size],
max_connection_size: config[:max_connection_size],
interceptors: config[:interceptors],
)
new(serv, config)
Expand Down
14 changes: 9 additions & 5 deletions lib/griffin/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,15 @@ def config_builder
end
end

# @param pool_size [Integer] Worker thread size
# @param min_pool_size [Integer] Worker thread mininum size
# @param max_pool_size [Integer] Worker thread maximun size
# @param min_connection_size [Integer] Maximun connection of TCP
# @param max_connection_size [Integer] Minimum connection of TCP
# @param interceptors [Array<GrpcKit::GRPC::ServerInterceptor>] list of interceptors
def initialize(pool_size:, interceptors: [], **opts)
@worker_size = pool_size
@server = GrpcKit::Server.new(interceptors: interceptors)
def initialize(min_pool_size:, max_pool_size:, min_connection_size:, max_connection_size:, interceptors: [], **opts)
@min_connection_size = min_connection_size
@max_connection_size = max_connection_size
@server = GrpcKit::Server.new(interceptors: interceptors, min_pool_size: min_pool_size, max_pool_size: max_pool_size)
@opts = opts
@status = :run
@worker_id = 0
Expand All @@ -65,7 +69,7 @@ def before_run(worker_id = 0)
def run(sock, blocking: true)
@socks << sock

@thread_pool = Griffin::ThreadPool.new(@worker_size) do |conn|
@thread_pool = Griffin::ThreadPool.new(min: @min_connection_size, max: @max_connection_size) do |conn|
@server.run(conn)
end

Expand Down
27 changes: 21 additions & 6 deletions lib/griffin/server_config_builder.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@ class ServerConfigBuilder
# Users can't change these values
SERVERENGIEN_FIXED_CONFIGS = %i[daemonize worker_type worker_process_name].freeze

# The default size of thread pool
DEFAULT_POOL_SIZE = 10
# The default size of thread pool TCP Connection
DEFAULT_POOL_SIZE = 30
DEFAULT_CONNECTION_SIZE = 3

GRIFFIN_CONFIGS = [
# The size of thread pool
:pool_size
:max_pool_size,
:min_pool_size,
:max_connection_size,
:min_connection_size,
].freeze

GRPC_CONFIGS = %i[services interceptors].freeze
Expand All @@ -31,7 +34,10 @@ def to_h
workers: 1,
bind: '0.0.0.0',
port: 50051,
pool_size: DEFAULT_POOL_SIZE,
max_pool_size: DEFAULT_POOL_SIZE,
min_pool_size: DEFAULT_POOL_SIZE,
max_connection_size: DEFAULT_CONNECTION_SIZE,
min_connection_size: DEFAULT_CONNECTION_SIZE,
interceptors: [],
services: [],
}.freeze
Expand All @@ -40,7 +46,7 @@ def initialize
@opts = DEFAULT_SERVER_CONFIG.dup
end

(SERVERENGINE_PRIMITIVE_CONFIGS + GRIFFIN_CONFIGS).each do |name|
(SERVERENGINE_PRIMITIVE_CONFIGS).each do |name|
define_method(name) do |value|
@opts[name] = value
end
Expand All @@ -52,6 +58,15 @@ def initialize
end
end

def pool_size(min, max)
@opts[:min_pool_size] = Integer(min)
@opts[:max_pool_size] = Integer(max)
end

def connection_size(min, max)
@opts[:min_connection_size] = Integer(min)
@opts[:max_connection_size] = Integer(max)
end
def interceptors(*value)
@opts[:interceptors].concat(value).flatten!
end
Expand Down
44 changes: 26 additions & 18 deletions lib/griffin/thread_pool.rb
Original file line number Diff line number Diff line change
@@ -1,25 +1,27 @@
# frozen_string_literal: true

require 'griffin/counting_semaphore'
require 'grpc_kit/thread_pool/auto_trimmer'

module Griffin
class ThreadPool
DEFAULT_POOL_SIZE = 20
DEFAULT_QUEUE_SIZE = 512
DEFAULT_MAX = 5
DEFAULT_MIN = 1
QUEUE_SIZE = 128

def initialize(pool_size = DEFAULT_POOL_SIZE, queue_size: DEFAULT_QUEUE_SIZE, &block)
@pool_size = pool_size
@queue_size = queue_size
def initialize(interval: 60, max: DEFAULT_MAX, min: DEFAULT_MIN, &block)
@max_pool_size = max
@min_pool_size = min
@block = block
@shutdown = false
@semaphore = Griffin::CountingSemaphore.new(queue_size)
@tasks = Queue.new
@tasks = SizedQueue.new(QUEUE_SIZE)

@spawned = 0
@workers = []
@mutex = Mutex.new
@waiting = 0

@pool_size.times { spawn_thread }
@min_pool_size.times { spawn_thread }
@auto_trimmer = GrpcKit::ThreadPool::AutoTrimmer.new(self, interval: interval + rand(10)).tap(&:start!)
end

def schedule(task, &block)
Expand All @@ -32,25 +34,31 @@ def schedule(task, &block)
end

# TODO: blocking now..
@semaphore.wait
@tasks.push(block || task)

@mutex.synchronize do
if @spawned < @pool_size
spawn_thread
end
if @mutex.synchronize { (@waiting < @tasks.size) && (@spawned < @max_pool_size) }
spawn_thread
end
end

def shutdown
@shutdown = true
@pool_size.times { @tasks.push(nil) }
@max_pool_size.times { @tasks.push(nil) }
@auto_trimmer.stop
until @workers.empty?
Griffin.logger.debug("#{@pool_size - @spawned} worker thread(s) shutdowned, waiting #{@spawned}")
Griffin.logger.debug("Shutdown waiting #{@waiting} workers")
sleep 1
end
end

# For GrpcKit::ThreadPool::AutoTrimmer
def trim(force = false)
if @mutex.synchronize { (force || (@waiting > 0)) && (@spawned > @min_pool_size) }
GrpcKit.logger.info("Trim worker! Next worker size #{@spawned - 1}")
@tasks.push(nil)
end
end

private

def spawn_thread
Expand All @@ -64,7 +72,9 @@ def spawn_thread
break
end

@mutex.synchronize { @waiting += 1 }
task = @tasks.pop
@mutex.synchronize { @waiting -= 1 }
if task.nil?
break
end
Expand All @@ -73,8 +83,6 @@ def spawn_thread
@block.call(task)
rescue Exception => e # rubocop:disable Lint/RescueException
Griffin.logger.error("An error occured on top level in worker #{Thread.current.name}: #{e.message} (#{e.class})\n #{Thread.current.backtrace.join("\n")} ")
ensure
@semaphore.signal
end
end

Expand Down

0 comments on commit 1306f0f

Please sign in to comment.