Skip to content

Commit 8b2f97b

Browse files
authored
[Feature] Implement Payload Codec interface and integrate it into the DataConverter (#50)
1 parent 108dceb commit 8b2f97b

File tree

5 files changed

+245
-10
lines changed

5 files changed

+245
-10
lines changed

lib/temporal/data_converter.rb

Lines changed: 41 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,73 @@
1+
require 'temporal/api/common/v1/message_pb'
2+
require 'temporal/errors'
13
require 'temporal/payload_converter'
24

35
module Temporal
46
class DataConverter
5-
def initialize(payload_converter: Temporal::PayloadConverter::DEFAULT)
7+
class MissingPayload < Temporal::Error; end
8+
9+
def initialize(payload_converter: Temporal::PayloadConverter::DEFAULT, payload_codecs: [])
610
@payload_converter = payload_converter
11+
@payload_codecs = payload_codecs
712
end
813

914
def to_payloads(data)
1015
return if data.nil? || Array(data).empty?
1116

1217
payloads = Array(data).map { |value| to_payload(value) }
13-
Temporal::Api::Common::V1::Payloads.new(payloads: payloads)
18+
Temporal::Api::Common::V1::Payloads.new(payloads: encode(payloads))
1419
end
1520

1621
def to_payload_map(data)
17-
data.to_h { |key, value| [key.to_s, to_payload(value)] }
22+
data.to_h do |key, value|
23+
payload = to_payload(value)
24+
encoded_payload = encode([payload]).first
25+
raise MissingPayload, 'Payload Codecs returned no payloads' unless encoded_payload
26+
27+
[key.to_s, encoded_payload]
28+
end
1829
end
1930

2031
def from_payloads(payloads)
2132
return unless payloads
2233

23-
payloads.payloads.map { |payload| from_payload(payload) }
34+
decode(payloads.payloads)
35+
.map { |payload| from_payload(payload) }
2436
end
2537

2638
def from_payload_map(payload_map)
2739
return unless payload_map
2840

2941
# Protobuf's Hash isn't compatible with the native Hash, ignore rubocop here
30-
# rubocop:disable Style/HashTransformValues, Style/MapToHash
31-
payload_map.map { |key, value| [key, from_payload(value)] }.to_h
32-
# rubocop:enable Style/HashTransformValues, Style/MapToHash
42+
# rubocop:disable Style/MapToHash
43+
payload_map.map do |key, payload|
44+
decoded_payload = decode([payload]).first
45+
raise MissingPayload, 'Payload Codecs returned no payloads' unless decoded_payload
46+
47+
[key, from_payload(decoded_payload)]
48+
end.to_h
49+
# rubocop:enable Style/MapToHash
3350
end
3451

3552
private
3653

37-
attr_reader :payload_converter
54+
attr_reader :payload_converter, :payload_codecs
55+
56+
def encode(payloads)
57+
payload_codecs.each do |codec|
58+
payloads = codec.encode(payloads)
59+
end
60+
61+
payloads
62+
end
63+
64+
def decode(payloads)
65+
payload_codecs.reverse_each do |codec|
66+
payloads = codec.decode(payloads)
67+
end
68+
69+
payloads
70+
end
3871

3972
def to_payload(data)
4073
payload_converter.to_payload(data)

lib/temporal/payload_codec/base.rb

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
module Temporal
2+
module PayloadCodec
3+
class Base
4+
def encode(_payloads)
5+
raise NoMethodError, 'must implement #encode'
6+
end
7+
8+
def decode(_payloads)
9+
raise NoMethodError, 'must implement #decode'
10+
end
11+
end
12+
end
13+
end

sig/temporal/data_converter.rbs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
11
module Temporal
22
class DataConverter
3-
def initialize: (?payload_converter: Temporal::PayloadConverter::_PayloadConverter) -> void
3+
class MissingPayload < Temporal::Error
4+
end
5+
6+
def initialize: (
7+
?payload_converter: Temporal::PayloadConverter::_PayloadConverter,
8+
?payload_codecs: Array[Temporal::PayloadCodec::_PayloadCodec],
9+
) -> void
410
def to_payloads: (Array[untyped]? data) -> Temporal::Api::Common::V1::Payloads?
511
def to_payload_map: (Hash[String | Symbol, untyped] data) -> Hash[String, Temporal::Api::Common::V1::Payload]
612
def from_payloads: (Temporal::Api::Common::V1::Payloads? payloads) -> Array[untyped]?
@@ -9,7 +15,10 @@ module Temporal
915
private
1016

1117
attr_reader payload_converter: Temporal::PayloadConverter::_PayloadConverter
18+
attr_reader payload_codecs: Array[Temporal::PayloadCodec::_PayloadCodec]
1219

20+
def encode: (Array[Temporal::Api::Common::V1::Payload]) -> Array[Temporal::Api::Common::V1::Payload]
21+
def decode: (Array[Temporal::Api::Common::V1::Payload]) -> Array[Temporal::Api::Common::V1::Payload]
1322
def to_payload: (untyped data) -> Temporal::Api::Common::V1::Payload
1423
def from_payload: (Temporal::Api::Common::V1::Payload payload) -> untyped
1524
end

sig/temporal/payload_codec/base.rbs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
module Temporal
2+
module PayloadCodec
3+
interface _PayloadCodec
4+
def encode: (Array[Temporal::Api::Common::V1::Payload]) -> Array[Temporal::Api::Common::V1::Payload]
5+
def decode: (Array[Temporal::Api::Common::V1::Payload]) -> Array[Temporal::Api::Common::V1::Payload]
6+
end
7+
8+
class Base
9+
include _PayloadCodec
10+
end
11+
end
12+
end

spec/unit/temporal/data_converter_spec.rb

Lines changed: 169 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,79 @@
1+
require 'temporal/api/common/v1/message_pb'
12
require 'temporal/data_converter'
23
require 'temporal/payload_converter'
4+
require 'temporal/payload_codec/base'
5+
6+
class TestConcatenatingPayloadCodec < Temporal::PayloadCodec::Base
7+
ENCODING = 'mixed'.freeze
8+
9+
def initialize(separator)
10+
super()
11+
@separator = separator
12+
end
13+
14+
def encode(payloads)
15+
data = payloads.map do |payload|
16+
Temporal::Api::Common::V1::Payload.encode(payload)
17+
end.join(separator)
18+
19+
return [] if data.empty?
20+
21+
[Temporal::Api::Common::V1::Payload.new(
22+
metadata: { encoding: ENCODING },
23+
data: data
24+
)]
25+
end
26+
27+
def decode(payloads)
28+
return if payloads.empty?
29+
raise 'unexpected number of payloads' if payloads.length > 1
30+
31+
payloads.first.data.split(separator).map do |bytes|
32+
Temporal::Api::Common::V1::Payload.decode(bytes)
33+
end
34+
end
35+
36+
private
37+
38+
attr_reader :separator
39+
end
40+
41+
class TestFaultyPayloadCodec < Temporal::PayloadCodec::Base
42+
def encode(_payloads)
43+
[]
44+
end
45+
46+
def decode(_payloads)
47+
[]
48+
end
49+
end
50+
51+
# This codec doesn't do much, but it ensures that codecs are applied in the correct order
52+
class TestEncodingSwappingPayloadCodec < Temporal::PayloadCodec::Base
53+
FROM_ENCODING = TestConcatenatingPayloadCodec::ENCODING
54+
TO_ENCODING = 'swapped'.freeze
55+
56+
def encode(payloads)
57+
payload = payloads.first
58+
raise 'unexpected payload' unless payload.metadata['encoding'] == FROM_ENCODING
59+
60+
payload.metadata['encoding'] = TO_ENCODING
61+
[payload]
62+
end
63+
64+
def decode(payloads)
65+
payload = payloads.first
66+
raise 'unexpected payload' unless payload.metadata['encoding'] == TO_ENCODING
67+
68+
payload.metadata['encoding'] = FROM_ENCODING
69+
[payload]
70+
end
71+
end
372

473
describe Temporal::DataConverter do
5-
subject { described_class.new(payload_converter: converter) }
74+
subject { described_class.new(payload_converter: converter, payload_codecs: codecs) }
675
let(:converter) { Temporal::PayloadConverter::DEFAULT }
76+
let(:codecs) { [] }
777
let(:nil_payload) do
878
Temporal::Api::Common::V1::Payload.new(
979
metadata: { 'encoding' => Temporal::PayloadConverter::Nil::ENCODING },
@@ -15,10 +85,15 @@
1585
data: '"test"',
1686
)
1787
end
88+
let(:test_codec) { TestConcatenatingPayloadCodec.new('$$$') }
89+
let(:faulty_codec) { TestFaultyPayloadCodec.new }
90+
let(:swap_codec) { TestEncodingSwappingPayloadCodec.new }
1891

1992
before do
2093
allow(converter).to receive(:to_payload).and_call_original
2194
allow(converter).to receive(:from_payload).and_call_original
95+
allow(test_codec).to receive(:encode).and_call_original
96+
allow(test_codec).to receive(:decode).and_call_original
2297
end
2398

2499
describe '#to_payloads' do
@@ -45,6 +120,21 @@
45120
expect(converter).to have_received(:to_payload).with('test').once
46121
expect(converter).to have_received(:to_payload).with(nil).once
47122
end
123+
124+
context 'with payload codecs' do
125+
let(:codecs) { [test_codec] }
126+
127+
it 'encodes the payloads' do
128+
result = subject.to_payloads(['test', nil])
129+
130+
expect(result).to be_a(Temporal::Api::Common::V1::Payloads)
131+
expect(result.payloads.length).to eq(1)
132+
expect(result.payloads.first.metadata['encoding'])
133+
.to eq(TestConcatenatingPayloadCodec::ENCODING)
134+
135+
expect(test_codec).to have_received(:encode).once
136+
end
137+
end
48138
end
49139

50140
describe '#to_payload_map' do
@@ -64,6 +154,30 @@
64154
expect(converter).to have_received(:to_payload).with('test').once
65155
expect(converter).to have_received(:to_payload).with(nil).once
66156
end
157+
158+
context 'with payload codecs' do
159+
let(:codecs) { [test_codec] }
160+
161+
it 'encodes each payload' do
162+
result = subject.to_payload_map({ 'one' => 'test', 'two' => nil })
163+
164+
expect(result.length).to eq(2)
165+
expect(result['one'].metadata['encoding']).to eq(TestConcatenatingPayloadCodec::ENCODING)
166+
expect(result['two'].metadata['encoding']).to eq(TestConcatenatingPayloadCodec::ENCODING)
167+
168+
expect(test_codec).to have_received(:encode).twice
169+
end
170+
end
171+
172+
context 'with a faulty codec' do
173+
let(:codecs) { [test_codec, faulty_codec] }
174+
175+
it 'raises an error' do
176+
expect do
177+
subject.to_payload_map({ 'one' => 'test', 'two' => nil })
178+
end.to raise_error(described_class::MissingPayload, 'Payload Codecs returned no payloads')
179+
end
180+
end
67181
end
68182

69183
describe '#from_payloads' do
@@ -79,9 +193,26 @@
79193
expect(converter).to have_received(:from_payload).with(json_payload).once
80194
expect(converter).to have_received(:from_payload).with(nil_payload).once
81195
end
196+
197+
context 'with payload codecs' do
198+
let(:codecs) { [test_codec] }
199+
200+
it 'decodecs the payloads' do
201+
mixed_payloads = test_codec.encode([json_payload, nil_payload])
202+
payloads = Temporal::Api::Common::V1::Payloads.new(payloads: mixed_payloads)
203+
204+
result = subject.from_payloads(payloads)
205+
206+
expect(result).to eq(['test', nil])
207+
expect(test_codec).to have_received(:decode).once
208+
end
209+
end
82210
end
83211

84212
describe '#from_payload_map' do
213+
let(:json_encoded) { test_codec.encode([json_payload]).first }
214+
let(:nil_encoded) { test_codec.encode([nil_payload]).first }
215+
85216
it 'returns nil when nothing is given' do
86217
expect(subject.from_payload_map(nil)).to eq(nil)
87218
end
@@ -93,6 +224,27 @@
93224
expect(converter).to have_received(:from_payload).with(json_payload).once
94225
expect(converter).to have_received(:from_payload).with(nil_payload).once
95226
end
227+
228+
context 'with payload codecs' do
229+
let(:codecs) { [test_codec] }
230+
231+
it 'decodecs each payload' do
232+
result = subject.from_payload_map({ 'one' => json_encoded, 'two' => nil_encoded })
233+
234+
expect(result).to eq({ 'one' => 'test', 'two' => nil })
235+
expect(test_codec).to have_received(:decode).twice
236+
end
237+
end
238+
239+
context 'with a faulty codec' do
240+
let(:codecs) { [faulty_codec, test_codec] }
241+
242+
it 'raises an error' do
243+
expect do
244+
subject.from_payload_map({ 'one' => json_encoded, 'two' => nil_encoded })
245+
end.to raise_error(described_class::MissingPayload, 'Payload Codecs returned no payloads')
246+
end
247+
end
96248
end
97249

98250
describe 'full circle' do
@@ -113,5 +265,21 @@
113265

114266
expect(subject.from_payload_map(subject.to_payload_map(input))).to eq(input)
115267
end
268+
269+
context 'with payload codecs' do
270+
let(:codecs) { [test_codec, swap_codec] }
271+
272+
it 'converts values to payloads and back' do
273+
input = ['test', nil]
274+
275+
expect(subject.from_payloads(subject.to_payloads(input))).to eq(input)
276+
end
277+
278+
it 'converts values map to payloads map and back' do
279+
input = { 'one' => 'test', 'two' => nil }
280+
281+
expect(subject.from_payload_map(subject.to_payload_map(input))).to eq(input)
282+
end
283+
end
116284
end
117285
end

0 commit comments

Comments
 (0)