Skip to content

Commit 40055d3

Browse files
authored
Client improvements (#69)
* Replace generic errors with specific ones in Client::Implementation * Serialize RetryPolicy for #start_workflow * Add missing types for Thermite::Config and Rutie * Require schema-less URLs when initializing a Connection * Raise specific error when workflow is already started * Switch UnsupportedQuery to QueryFailed error
1 parent 1355f62 commit 40055d3

14 files changed

+149
-38
lines changed

Steepfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ target :lib do
66
check 'lib'
77

88
repo_path 'vendor/rbs/gem_rbs_collection/gems'
9-
library 'protobuf', 'json', 'securerandom', 'socket'
9+
library 'protobuf', 'json', 'securerandom', 'socket', 'uri'
1010

1111
ignore 'lib/gen/*.rb'
1212

lib/temporal/client/implementation.rb

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ def handle_start_workflow(input)
119119
workflow_run_timeout: input.run_timeout,
120120
workflow_task_timeout: input.task_timeout,
121121
workflow_id_reuse_policy: Workflow::IDReusePolicy.to_raw(input.id_reuse_policy),
122-
retry_policy: nil, # TODO: serialize retry policy
122+
retry_policy: input.retry_policy&.to_proto,
123123
cron_schedule: input.cron_schedule,
124124
memo: memo,
125125
search_attributes: search_attributes,
@@ -154,8 +154,7 @@ def handle_start_workflow(input)
154154
rescue Temporal::Bridge::Error => e
155155
# TODO: Raise a better error from the bridge
156156
if e.message.include?('AlreadyExists')
157-
# TODO: Replace with a more specific error
158-
raise Temporal::Error, 'Workflow already exists'
157+
raise Temporal::Error::WorkflowExecutionAlreadyStarted, 'Workflow execution already started'
159158
else
160159
raise # re-raise
161160
end
@@ -196,17 +195,13 @@ def handle_query_workflow(input)
196195

197196
if response.query_rejected
198197
status = Workflow::ExecutionStatus.from_raw(response.query_rejected.status)
199-
# TODO: Replace with a specific error when implemented
200-
raise Temporal::Error, "Query rejected, workflow status: #{status}"
198+
raise Temporal::Error::QueryRejected, status
201199
end
202200

203201
converter.from_payloads(response.query_result)&.first
204202
rescue Temporal::Bridge::Error => e
205203
# TODO: Raise a better error from the bridge
206-
if e.message.include?('unknown queryType')
207-
# TODO: Replace with a more specific error
208-
raise Temporal::Error, 'Unsupported query'
209-
end
204+
raise Temporal::Error::QueryFailed, e.message
210205
end
211206

212207
def handle_signal_workflow(input)

lib/temporal/connection.rb

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
require 'temporal/api/workflowservice/v1/request_response_pb'
22
require 'temporal/bridge'
33
require 'temporal/runtime'
4+
require 'uri'
45

56
module Temporal
67
# A connection to the Temporal server.
@@ -13,8 +14,9 @@ class Connection
1314
# @param host [String] `host:port` for the Temporal server. For local development, this is
1415
# often `"localhost:7233"`.
1516
def initialize(host)
17+
url = parse_url(host)
1618
runtime = Temporal::Runtime.instance
17-
@core_connection = Temporal::Bridge::Connection.connect(runtime.core_runtime, host)
19+
@core_connection = Temporal::Bridge::Connection.connect(runtime.core_runtime, url)
1820
end
1921

2022
# @param request [Temporal::Api::WorkflowService::V1::RegisterNamespaceRequest]
@@ -718,5 +720,17 @@ def list_batch_operations(request, metadata: {}, timeout: nil)
718720

719721
Temporal::Api::WorkflowService::V1::ListBatchOperationsResponse.decode(response)
720722
end
723+
724+
private
725+
726+
def parse_url(url)
727+
# Turn this into a valid URI before parsing
728+
uri = URI.parse(url.include?('://') ? url : "//#{url}")
729+
raise Temporal::Error, 'Target host as URL with scheme are not supported' if uri.scheme
730+
731+
# TODO: Add support for mTLS
732+
uri.scheme = 'http'
733+
uri.to_s
734+
end
721735
end
722736
end

lib/temporal/errors.rb

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,20 @@ module Temporal
33
class Error < StandardError
44
# Superclass for RPC and proto related errors
55
class RPCError < Error; end
6+
67
class UnexpectedResponse < RPCError; end
8+
9+
class WorkflowExecutionAlreadyStarted < RPCError; end
10+
11+
class QueryFailed < RPCError; end
12+
13+
class QueryRejected < RPCError
14+
attr_reader :status
15+
16+
def initialize(status)
17+
super("Query rejected, workflow status: #{status}")
18+
@status = status
19+
end
20+
end
721
end
822
end

lib/temporal/retry_policy.rb

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
require 'temporal/api/common/v1/message_pb'
12
require 'temporal/errors'
23

34
module Temporal
@@ -67,5 +68,15 @@ def validate!
6768
raise Invalid, 'Maximum interval cannot be less than initial interval' if max_interval < initial_interval
6869
end
6970
end
71+
72+
def to_proto
73+
Temporal::Api::Common::V1::RetryPolicy.new(
74+
initial_interval: Google::Protobuf::Duration.new(seconds: initial_interval),
75+
backoff_coefficient: backoff,
76+
maximum_interval: max_interval ? Google::Protobuf::Duration.new(seconds: max_interval) : nil,
77+
maximum_attempts: max_attempts,
78+
non_retryable_error_types: non_retriable_errors.map(&:name).compact,
79+
)
80+
end
7081
end
7182
end

sig/temporal/bridge.rbs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
11
BRIDGE_DIR: String
22

33
class Rutie
4-
def initialize: (Symbol, release: String | Symbol) -> void
4+
def initialize: (
5+
Symbol bridge,
6+
?lib_path: String | Symbol,
7+
?lib_suffix: String | Symbol,
8+
?lib_prefix: String | Symbol
9+
) -> void
510
def init: (String, String) -> void
611
end
712

sig/temporal/connection.rbs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,5 +43,9 @@ module Temporal
4343
def get_cluster_info: (Temporal::Api::WorkflowService::V1::GetClusterInfoRequest request, ?metadata: Hash[String, String], ?timeout: Integer?) -> Temporal::Api::WorkflowService::V1::GetClusterInfoResponse
4444
def get_system_info: (Temporal::Api::WorkflowService::V1::GetSystemInfoRequest request, ?metadata: Hash[String, String], ?timeout: Integer?) -> Temporal::Api::WorkflowService::V1::GetSystemInfoResponse
4545
def list_task_queue_partitions: (Temporal::Api::WorkflowService::V1::ListTaskQueuePartitionsRequest request, ?metadata: Hash[String, String], ?timeout: Integer?) -> Temporal::Api::WorkflowService::V1::ListTaskQueuePartitionsResponse
46+
47+
private
48+
49+
def parse_url: (String) -> String
4650
end
4751
end

sig/temporal/errors.rbs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,17 @@ module Temporal
55

66
class UnexpectedResponse < RPCError
77
end
8+
9+
class WorkflowExecutionAlreadyStarted < RPCError
10+
end
11+
12+
class QueryFailed < RPCError
13+
end
14+
15+
class QueryRejected < RPCError
16+
attr_reader status: Temporal::Workflow::ExecutionStatus::values
17+
18+
def initialize: (Temporal::Workflow::ExecutionStatus::values) -> void
19+
end
820
end
921
end

sig/temporal/retry_policy.rbs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,23 @@
11
module Temporal
2-
class Error < StandardError
3-
end
4-
52
class RetryPolicy
63
Invalid: Temporal::Error
74

85
attr_reader initial_interval: Integer
96
attr_reader backoff: Float
107
attr_reader max_interval: Integer?
118
attr_reader max_attempts: Integer
12-
attr_reader non_retriable_errors: Array[StandardError]
9+
attr_reader non_retriable_errors: Array[Class]
1310

1411
def initialize: (
1512
?initial_interval: Integer,
1613
?backoff: Float,
1714
?max_interval: nil,
1815
?max_attempts: Integer,
19-
?non_retriable_errors: Array[StandardError]
16+
?non_retriable_errors: Array[Class]
2017
) -> void
2118

2219
def validate!: -> nil
20+
21+
def to_proto: -> Temporal::Api::Common::V1::RetryPolicy
2322
end
2423
end

sig/thermite_patch.rbs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
module Thermite
2+
class Config
3+
def target_arch: -> String
4+
def target_os: -> String
5+
def cargo_target_path: (String, *String) -> String
6+
def rust_toplevel_dir: -> String
7+
8+
private
9+
10+
@target_arch: String?
11+
@target_os: String?
12+
end
13+
end

spec/integration/client_spec.rb

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -54,13 +54,13 @@ def terminate_workflow(input)
5454

5555
subject { described_class.new(connection, namespace) }
5656

57-
let(:connection) { Temporal::Connection.new("http://#{url}") }
57+
let(:connection) { Temporal::Connection.new(url) }
5858
let(:id) { SecureRandom.uuid }
5959
let(:workflow) { 'kitchen_sink' }
6060

6161
before(:all) do
6262
@server_pid = fork { exec("#{support_path}/go_server/main #{port} #{namespace}") }
63-
Helpers::TestRPC.wait("http://#{url}", 10, 0.5)
63+
Helpers::TestRPC.wait(url, 10, 0.5)
6464

6565
@worker_pid = fork { exec("#{support_path}/go_worker/main #{url} #{namespace} #{task_queue}") }
6666
end
@@ -96,7 +96,10 @@ def terminate_workflow(input)
9696
task_queue: task_queue,
9797
id_reuse_policy: Temporal::Workflow::IDReusePolicy::REJECT_DUPLICATE,
9898
)
99-
end.to raise_error(Temporal::Error, 'Workflow already exists')
99+
end.to raise_error(
100+
Temporal::Error::WorkflowExecutionAlreadyStarted,
101+
'Workflow execution already started'
102+
)
100103

101104
# Run with a default policy again expecting it to succeed
102105
input = { actions: [{ result: { value: 'test return value 3' } }] }
@@ -184,10 +187,10 @@ def terminate_workflow(input)
184187
handle = subject.start_workflow(workflow, input, id: id, task_queue: task_queue)
185188
handle.result
186189

187-
# TODO: Change this to a wrapped error later
188-
expect do
189-
handle.query('other test query', 'test query arg')
190-
end.to raise_error(Temporal::Error, 'Unsupported query')
190+
expect { handle.query('other test query', 'test query arg') }.to raise_error do |error|
191+
expect(error).to be_a(Temporal::Error::QueryFailed)
192+
expect(error.message).to include('unknown queryType other test query')
193+
end
191194
end
192195
end
193196

spec/integration/connection_spec.rb

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,13 @@
66
describe Temporal::Connection do
77
mock_address = '0.0.0.0:4444'.freeze
88

9-
subject { described_class.new("http://#{mock_address}") }
9+
subject { described_class.new(mock_address) }
1010

1111
# TODO: For some reason the Bridge doesn't play well with the server in the same
1212
# process throwing SegFaults in cases. Needs further investigation
1313
before(:all) do
1414
@pid = fork { exec('bundle exec ruby spec/support/mock_server.rb') }
15-
Helpers::TestRPC.wait("http://#{mock_address}", 10)
15+
Helpers::TestRPC.wait(mock_address, 10)
1616
end
1717
after(:all) { Process.kill('QUIT', @pid) }
1818

@@ -49,7 +49,13 @@
4949
end
5050

5151
it 'raises when unable to connect' do
52-
expect { described_class.new('http://0.0.0.0:3333') }.to raise_error(Temporal::Bridge::Error)
52+
expect { described_class.new('0.0.0.0:3333') }.to raise_error(Temporal::Bridge::Error)
53+
end
54+
55+
it 'raises when given a URL with schema' do
56+
expect do
57+
described_class.new('http://localhost:3333')
58+
end.to raise_error(Temporal::Error, 'Target host as URL with scheme are not supported')
5359
end
5460

5561
it 'raises when incorrect request was provided' do

spec/unit/temporal/client/implementation_spec.rb

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@
209209
it 'raises Temporal::Error' do
210210
expect do
211211
subject.start_workflow(input)
212-
end.to raise_error(Temporal::Error, 'Workflow already exists')
212+
end.to raise_error(Temporal::Error::WorkflowExecutionAlreadyStarted, 'Workflow execution already started')
213213
end
214214
end
215215

@@ -360,6 +360,26 @@
360360
expect(subject.query_workflow(input)).to eq('test')
361361
end
362362

363+
context 'when query is rejected' do
364+
before do
365+
allow(connection)
366+
.to receive(:query_workflow)
367+
.and_return(
368+
Temporal::Api::WorkflowService::V1::QueryWorkflowResponse.new(
369+
query_rejected: { status: :WORKFLOW_EXECUTION_STATUS_COMPLETED },
370+
)
371+
)
372+
end
373+
374+
it 'raises an error' do
375+
expect { subject.query_workflow(input) }.to raise_error do |error|
376+
expect(error).to be_a(Temporal::Error::QueryRejected)
377+
expect(error.message).to eq('Query rejected, workflow status: COMPLETED')
378+
expect(error.status).to eq(Temporal::Workflow::ExecutionStatus::COMPLETED)
379+
end
380+
end
381+
end
382+
363383
context 'with header' do
364384
before { input.headers = { 'header' => 'test' } }
365385

spec/unit/temporal/retry_policy_spec.rb

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,21 @@
11
require 'temporal/retry_policy'
22

33
describe Temporal::RetryPolicy do
4+
subject { described_class.new(**valid_attributes) }
5+
6+
let(:valid_attributes) do
7+
{
8+
initial_interval: 1,
9+
backoff: 1.5,
10+
max_interval: 5,
11+
max_attempts: 3,
12+
non_retriable_errors: [StandardError],
13+
}
14+
end
15+
416
describe '#validate!' do
517
subject { described_class.new(**attributes) }
618

7-
let(:valid_attributes) do
8-
{
9-
initial_interval: 1,
10-
backoff: 1.5,
11-
max_interval: 5,
12-
max_attempts: 3,
13-
non_retriable_errors: [StandardError],
14-
}
15-
end
16-
1719
let(:unlimited_attempts) do
1820
{
1921
initial_interval: 1,
@@ -118,4 +120,17 @@
118120
end
119121
end
120122
end
123+
124+
describe '#to_proto' do
125+
it 'serializes itself into proto' do
126+
proto = subject.to_proto
127+
128+
expect(proto).to be_a(Temporal::Api::Common::V1::RetryPolicy)
129+
expect(proto.initial_interval.seconds).to eq(subject.initial_interval)
130+
expect(proto.backoff_coefficient).to eq(subject.backoff)
131+
expect(proto.maximum_interval.seconds).to eq(subject.max_interval)
132+
expect(proto.maximum_attempts).to eq(subject.max_attempts)
133+
expect(proto.non_retryable_error_types.to_a).to eq(['StandardError'])
134+
end
135+
end
121136
end

0 commit comments

Comments
 (0)