Skip to content

Commit 2998852

Browse files
authored
feat: add thread pool with promises to limit concurrent sideloading (#472)
Previously, sideloading in Graphiti used an unbounded number of threads, which could decrease performance due to resource contention and exhaust the ActiveRecord connection pool, leading to timeouts and connection errors. This commit introduces a thread pool based on ActiveRecord's global_thread_pool_async_query_executor, limiting the maximum number of resources that can be sideloaded concurrently. With a properly configured connection pool, this ensures that the ActiveRecord connection pool is not exhausted by the sideloading process.
1 parent 878ed02 commit 2998852

9 files changed

+499
-140
lines changed

graphiti.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ Gem::Specification.new do |spec|
2222
spec.add_dependency "jsonapi-renderer", "~> 0.2", ">= 0.2.2"
2323
spec.add_dependency "dry-types", ">= 0.15.0", "< 2.0"
2424
spec.add_dependency "graphiti_errors", "~> 1.1.0"
25-
spec.add_dependency "concurrent-ruby", "~> 1.0"
25+
spec.add_dependency "concurrent-ruby", ">= 1.2", "< 2.0"
2626
spec.add_dependency "activesupport", ">= 5.2"
2727

2828
spec.add_development_dependency "faraday", "~> 0.15"

lib/graphiti/configuration.rb

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,20 @@ class Configuration
88
# Defaults to false OR if classes are cached (Rails-only)
99
attr_accessor :concurrency
1010

11+
# This number must be considered in accordance with the database
12+
# connection pool size configured in `database.yml`. The connection
13+
# pool should be large enough to accommodate both the foreground
14+
# threads (ie. web server or job worker threads) and background
15+
# threads. For each process, Graphiti will create one global
16+
# executor that uses this many threads to sideload resources
17+
# asynchronously. Thus, the pool size should be at least
18+
# `thread_count + concurrency_max_threads + 1`. For example, if your
19+
# web server has a maximum of 3 threads, and
20+
# `concurrency_max_threads` is set to 4, then your pool size should
21+
# be at least 8.
22+
# @return [Integer] Maximum number of threads to use when fetching sideloads concurrently
23+
attr_accessor :concurrency_max_threads
24+
1125
attr_accessor :respond_to
1226
attr_accessor :context_for_endpoint
1327
attr_accessor :links_on_demand
@@ -28,6 +42,7 @@ class Configuration
2842
def initialize
2943
@raise_on_missing_sideload = true
3044
@concurrency = false
45+
@concurrency_max_threads = 4
3146
@respond_to = [:json, :jsonapi, :xml]
3247
@links_on_demand = false
3348
@pagination_links_on_demand = false

lib/graphiti/resource_proxy.rb

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,16 +77,24 @@ def as_graphql(options = {})
7777
def data
7878
@data ||= begin
7979
records = @scope.resolve
80-
if records.empty? && raise_on_missing?
81-
raise Graphiti::Errors::RecordNotFound
82-
end
80+
raise Graphiti::Errors::RecordNotFound if records.empty? && raise_on_missing?
81+
8382
records = records[0] if single?
8483
records
8584
end
8685
end
8786
alias_method :to_a, :data
8887
alias_method :resolve_data, :data
8988

89+
def future_resolve_data
90+
@scope.future_resolve.then do |records|
91+
raise Graphiti::Errors::RecordNotFound if records.empty? && raise_on_missing?
92+
93+
records = records[0] if single?
94+
@data = records
95+
end
96+
end
97+
9098
def meta
9199
@meta ||= data.respond_to?(:meta) ? data.meta : {}
92100
end

lib/graphiti/scope.rb

Lines changed: 117 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,35 @@ module Graphiti
22
class Scope
33
attr_accessor :object, :unpaginated_object
44
attr_reader :pagination
5+
6+
GLOBAL_THREAD_POOL_EXECUTOR_BROADCAST_STATS = %i[
7+
length max_length queue_length max_queue completed_task_count largest_length scheduled_task_count synchronous
8+
]
9+
GLOBAL_THREAD_POOL_EXECUTOR = Concurrent::Promises.delay do
10+
if Graphiti.config.concurrency
11+
concurrency = Graphiti.config.concurrency_max_threads || 4
12+
Concurrent::ThreadPoolExecutor.new(
13+
min_threads: 0,
14+
max_threads: concurrency,
15+
max_queue: concurrency * 4,
16+
fallback_policy: :caller_runs
17+
)
18+
else
19+
Concurrent::ThreadPoolExecutor.new(max_threads: 0, synchronous: true, fallback_policy: :caller_runs)
20+
end
21+
end
22+
private_constant :GLOBAL_THREAD_POOL_EXECUTOR, :GLOBAL_THREAD_POOL_EXECUTOR_BROADCAST_STATS
23+
24+
def self.global_thread_pool_executor
25+
GLOBAL_THREAD_POOL_EXECUTOR.value!
26+
end
27+
28+
def self.global_thread_pool_stats
29+
GLOBAL_THREAD_POOL_EXECUTOR_BROADCAST_STATS.each_with_object({}) do |key, memo|
30+
memo[key] = global_thread_pool_executor.send(key)
31+
end
32+
end
33+
534
def initialize(object, resource, query, opts = {})
635
@object = object
736
@resource = resource
@@ -14,57 +43,33 @@ def initialize(object, resource, query, opts = {})
1443
end
1544

1645
def resolve
17-
if @query.zero_results?
18-
[]
19-
else
20-
resolved = broadcast_data { |payload|
21-
@object = @resource.before_resolve(@object, @query)
22-
payload[:results] = @resource.resolve(@object)
23-
payload[:results]
24-
}
25-
resolved.compact!
26-
assign_serializer(resolved)
27-
yield resolved if block_given?
28-
@opts[:after_resolve]&.call(resolved)
29-
resolve_sideloads(resolved) unless @query.sideloads.empty?
30-
resolved
31-
end
46+
future_resolve.value!
3247
end
3348

3449
def resolve_sideloads(results)
35-
return if results == []
50+
future_resolve_sideloads(results).value!
51+
end
3652

37-
concurrent = Graphiti.config.concurrency
38-
promises = []
53+
def future_resolve
54+
return Concurrent::Promises.fulfilled_future([], self.class.global_thread_pool_executor) if @query.zero_results?
3955

40-
@query.sideloads.each_pair do |name, q|
41-
sideload = @resource.class.sideload(name)
42-
next if sideload.nil? || sideload.shared_remote?
43-
parent_resource = @resource
44-
graphiti_context = Graphiti.context
45-
resolve_sideload = -> {
46-
Graphiti.config.before_sideload&.call(graphiti_context)
47-
Graphiti.context = graphiti_context
48-
sideload.resolve(results, q, parent_resource)
49-
@resource.adapter.close if concurrent
50-
}
51-
if concurrent
52-
promises << Concurrent::Promise.execute(&resolve_sideload)
53-
else
54-
resolve_sideload.call
55-
end
56+
resolved = broadcast_data { |payload|
57+
@object = @resource.before_resolve(@object, @query)
58+
payload[:results] = @resource.resolve(@object)
59+
payload[:results]
60+
}
61+
resolved.compact!
62+
assign_serializer(resolved)
63+
yield resolved if block_given?
64+
@opts[:after_resolve]&.call(resolved)
65+
sideloaded = @query.parents.any?
66+
close_adapter = Graphiti.config.concurrency && sideloaded
67+
if close_adapter
68+
@resource.adapter.close
5669
end
5770

58-
if concurrent
59-
# Wait for all promises to finish
60-
sleep 0.01 until promises.all? { |p| p.fulfilled? || p.rejected? }
61-
# Re-raise the error with correct stacktrace
62-
# OPTION** to avoid failing here?? if so need serializable patch
63-
# to avoid loading data when association not loaded
64-
if (rejected = promises.find(&:rejected?))
65-
raise rejected.reason
66-
end
67-
end
71+
future_resolve_sideloads(resolved)
72+
.then_on(self.class.global_thread_pool_executor, resolved) { resolved }
6873
end
6974

7075
def parent_resource
@@ -108,6 +113,74 @@ def updated_at
108113

109114
private
110115

116+
def future_resolve_sideloads(results)
117+
return Concurrent::Promises.fulfilled_future(nil, self.class.global_thread_pool_executor) if results == []
118+
119+
sideload_promises = @query.sideloads.filter_map do |name, q|
120+
sideload = @resource.class.sideload(name)
121+
next if sideload.nil? || sideload.shared_remote?
122+
123+
p = future_with_context(results, q, @resource) do |parent_results, sideload_query, parent_resource|
124+
Graphiti.config.before_sideload&.call(Graphiti.context)
125+
sideload.future_resolve(parent_results, sideload_query, parent_resource)
126+
end
127+
p.flat
128+
end
129+
130+
Concurrent::Promises.zip_futures_on(self.class.global_thread_pool_executor, *sideload_promises)
131+
.rescue_on(self.class.global_thread_pool_executor) do |*reasons|
132+
first_error = reasons.find { |r| r.is_a?(Exception) }
133+
raise first_error
134+
end
135+
end
136+
137+
def future_with_context(*args)
138+
thread_storage = Thread.current.keys.each_with_object({}) do |key, memo|
139+
memo[key] = Thread.current[key]
140+
end
141+
fiber_storage =
142+
if Fiber.current.respond_to?(:storage)
143+
Fiber.current&.storage&.keys&.each_with_object({}) do |key, memo|
144+
memo[key] = Fiber[key]
145+
end
146+
end
147+
148+
Concurrent::Promises.future_on(
149+
self.class.global_thread_pool_executor, Thread.current.object_id, thread_storage, fiber_storage, *args
150+
) do |thread_id, thread_storage, fiber_storage, *args|
151+
wrap_in_rails_executor do
152+
execution_context_changed = thread_id != Thread.current.object_id
153+
if execution_context_changed
154+
thread_storage&.keys&.each_with_object(Thread.current) do |key, thread_current|
155+
thread_current[key] = thread_storage[key]
156+
end
157+
fiber_storage&.keys&.each_with_object(Fiber) do |key, fiber_current|
158+
fiber_current[key] = fiber_storage[key]
159+
end
160+
end
161+
162+
result = Graphiti.broadcast(:global_thread_pool_task_run, self.class.global_thread_pool_stats) do
163+
yield(*args)
164+
end
165+
166+
if execution_context_changed
167+
thread_storage&.keys&.each { |key| Thread.current[key] = nil }
168+
fiber_storage&.keys&.each { |key| Fiber[key] = nil }
169+
end
170+
171+
result
172+
end
173+
end
174+
end
175+
176+
def wrap_in_rails_executor(&block)
177+
if defined?(::Rails.application.executor)
178+
::Rails.application.executor.wrap(&block)
179+
else
180+
yield
181+
end
182+
end
183+
111184
def sideload_resource_proxies
112185
@sideload_resource_proxies ||= begin
113186
@object = @resource.before_resolve(@object, @query)

lib/graphiti/sideload.rb

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ def build_resource_proxy(parents, query, graph_parent)
236236
end
237237

238238
def load(parents, query, graph_parent)
239-
build_resource_proxy(parents, query, graph_parent).to_a
239+
future_load(parents, query, graph_parent).value!
240240
end
241241

242242
# Override in subclass
@@ -286,7 +286,7 @@ def assign(parents, children)
286286
children.replace(associated) if track_associated
287287
end
288288

289-
def resolve(parents, query, graph_parent)
289+
def future_resolve(parents, query, graph_parent)
290290
if single? && parents.length > 1
291291
raise Errors::SingularSideload.new(self, parents.length)
292292
end
@@ -300,11 +300,11 @@ def resolve(parents, query, graph_parent)
300300
sideload: self,
301301
sideload_parent_length: parents.length,
302302
default_paginate: false
303-
sideload_scope.resolve do |sideload_results|
303+
sideload_scope.future_resolve do |sideload_results|
304304
fire_assign(parents, sideload_results)
305305
end
306306
else
307-
load(parents, query, graph_parent)
307+
future_load(parents, query, graph_parent)
308308
end
309309
end
310310

@@ -368,6 +368,11 @@ def resource_class_loaded?
368368

369369
private
370370

371+
def future_load(parents, query, graph_parent)
372+
proxy = build_resource_proxy(parents, query, graph_parent)
373+
proxy.respond_to?(:future_resolve_data) ? proxy.future_resolve_data : Concurrent::Promises.fulfilled_future(proxy)
374+
end
375+
371376
def blank_query?(params)
372377
if (filter = params[:filter])
373378
if filter.values == [""]

lib/graphiti/sideload/polymorphic_belongs_to.rb

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,18 +108,23 @@ def child_for_type!(type)
108108
end
109109

110110
def resolve(parents, query, graph_parent)
111-
parents.group_by(&grouper.field_name).each_pair do |group_name, group|
111+
future_resolve(parents, query, graph_parent).value!
112+
end
113+
114+
def future_resolve(parents, query, graph_parent)
115+
promises = parents.group_by(&grouper.field_name).filter_map do |(group_name, group)|
112116
next if group_name.nil? || grouper.ignore?(group_name)
113117

114118
match = ->(c) { c.group_name == group_name.to_sym }
115119
if (sideload = children.values.find(&match))
116120
duped = remove_invalid_sideloads(sideload.resource, query)
117-
sideload.resolve(group, duped, graph_parent)
121+
sideload.future_resolve(group, duped, graph_parent)
118122
else
119123
err = ::Graphiti::Errors::PolymorphicSideloadChildNotFound
120124
raise err.new(self, group_name)
121125
end
122126
end
127+
Concurrent::Promises.zip(*promises)
123128
end
124129

125130
private

spec/configuration_spec.rb

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,21 @@
150150
end
151151
end
152152

153+
describe "#concurrency_max_threads" do
154+
include_context "with config", :concurrency_max_threads
155+
156+
it "defaults" do
157+
expect(Graphiti.config.concurrency_max_threads).to eq(4)
158+
end
159+
160+
it "is overridable" do
161+
Graphiti.configure do |c|
162+
c.concurrency_max_threads = 1
163+
end
164+
expect(Graphiti.config.concurrency_max_threads).to eq(1)
165+
end
166+
end
167+
153168
describe "#raise_on_missing_sideload" do
154169
include_context "with config", :raise_on_missing_sideload
155170

0 commit comments

Comments
 (0)