Skip to content
This repository was archived by the owner on May 27, 2022. It is now read-only.

Commit c54faa8

Browse files
authored
New pb (#11)
* Adapting to interface changes of new pb codec * Cleaning up dependencies and lint
1 parent 2fe7e04 commit c54faa8

File tree

4 files changed

+64
-72
lines changed

4 files changed

+64
-72
lines changed

rebar.config

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,11 @@
4343
#{regex => "^[a-z]([a-z0-9]*_?)*(_SUITE)?$",
4444
ignore => []}
4545
},
46-
% Can be added back if antidote_crdt_bcounter:localPermissions is renamed
47-
%{
48-
% elvis_style,
49-
% function_naming_convention,
50-
% #{regex => "^([a-z][a-z0-9]*_?)*$"}
51-
%},
46+
{
47+
elvis_style,
48+
function_naming_convention,
49+
#{regex => "^([a-z][a-z0-9]*_?)*$"}
50+
},
5251
{elvis_style, state_record_and_type},
5352
{elvis_style, no_spec_with_records}
5453
]

rebar.lock

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{"1.1.0",
2-
[{<<"antidote_pb_codec">>,{pkg,<<"antidote_pb_codec">>,<<"0.0.5">>},0}]}.
2+
[{<<"antidote_pb_codec">>,{pkg,<<"antidote_pb_codec">>,<<"0.1.0">>},0}]}.
33
[
44
{pkg_hash,[
5-
{<<"antidote_pb_codec">>, <<"139F291D7E4971DE3920E51D09BD10931AF957656ED6F7D9935B3EA059D88167">>}]}
5+
{<<"antidote_pb_codec">>, <<"D8CC2D69BD25B3961ADECE20954FCB66719AA1DF1D027CF58389C8B7B6EFC739">>}]}
66
].

src/antidotec_pb.erl

Lines changed: 44 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -32,61 +32,68 @@
3232

3333
-define(TIMEOUT, 10000).
3434

35-
-spec start_transaction(Pid::term(), TimeStamp::term())
35+
-spec start_transaction(Pid::pid(), TimeStamp::binary() | ignore)
3636
-> {ok, {interactive, term()} | {static, {term(), term()}}} | {error, term()}.
3737
start_transaction(Pid, TimeStamp) ->
3838
start_transaction(Pid, TimeStamp, []).
3939

40-
-spec start_transaction(Pid::term(), TimeStamp::term(), TxnProperties::term())
41-
-> {ok, {interactive, term()} | {static, {term(), term()}}} | {error, term()}.
40+
-spec start_transaction(Pid::pid(), TimeStamp::binary() | ignore, TxnProperties::term())
41+
-> {ok, {interactive, binary()} | {static, {binary(), term()}}} | {error, term()}.
4242
start_transaction(Pid, TimeStamp, TxnProperties) ->
43+
EncTimestamp = case TimeStamp of
44+
ignore -> term_to_binary(ignore);
45+
Binary -> Binary
46+
end,
4347
case is_static(TxnProperties) of
4448
true ->
45-
{ok, {static, {TimeStamp, TxnProperties}}};
49+
{ok, {static, {EncTimestamp, TxnProperties}}};
4650
false ->
47-
EncMsg = antidote_pb_codec:encode(start_transaction,
48-
{TimeStamp, TxnProperties}),
51+
EncMsg = antidote_pb_codec:encode_request({start_transaction, EncTimestamp, TxnProperties}),
4952
Result = antidotec_pb_socket:call_infinity(Pid, {req, EncMsg, ?TIMEOUT}),
5053
case Result of
5154
{error, timeout} ->
5255
{error, timeout};
5356
_ ->
5457
case antidote_pb_codec:decode_response(Result) of
55-
{start_transaction, TxId} ->
58+
{start_transaction_response, {ok, TxId}} ->
5659
{ok, {interactive, TxId}};
57-
{error, Reason} ->
60+
{start_transaction_response, {error, Reason}} ->
61+
{error, Reason};
62+
{error_response, Reason} ->
5863
{error, Reason};
5964
Other ->
6065
{error, Other}
6166
end
6267
end
6368
end.
6469

65-
-spec abort_transaction(Pid::term(), TxId::term()) -> ok | {error, term()}.
70+
-spec abort_transaction(Pid::pid(), {interactive, TxId::binary()}) -> ok | {error, term()}.
6671
abort_transaction(Pid, {interactive, TxId}) ->
67-
EncMsg = antidote_pb_codec:encode(abort_transaction, TxId),
72+
EncMsg = antidote_pb_codec:encode_request({abort_transaction, TxId}),
6873
Result = antidotec_pb_socket:call_infinity(Pid, {req, EncMsg, ?TIMEOUT}),
6974
case Result of
7075
{error, timeout} -> {error, timeout};
7176
_ ->
7277
case antidote_pb_codec:decode_response(Result) of
73-
{opresponse, ok} -> ok;
74-
{error, Reason} -> {error, Reason};
78+
{operation_response, ok} -> ok;
79+
{operation_response, {error, Reason}} -> {error, Reason};
80+
{error_response, Reason} -> {error, Reason};
7581
Other -> {error, Other}
7682
end
7783
end.
7884

79-
-spec commit_transaction(Pid::term(), TxId::{interactive, term()} | {static, term()}) ->
80-
{ok, term()} | {error, term()}.
85+
-spec commit_transaction(Pid::pid(), TxId::{interactive, binary()} | {static, binary()}) ->
86+
{ok, binary()} | {error, term()}.
8187
commit_transaction(Pid, {interactive, TxId}) ->
82-
EncMsg = antidote_pb_codec:encode(commit_transaction, TxId),
88+
EncMsg = antidote_pb_codec:encode_request({commit_transaction, TxId}),
8389
Result = antidotec_pb_socket:call_infinity(Pid, {req, EncMsg, ?TIMEOUT}),
8490
case Result of
8591
{error, timeout} -> {error, timeout};
8692
_ ->
8793
case antidote_pb_codec:decode_response(Result) of
88-
{commit_transaction, CommitTimeStamp} -> {ok, CommitTimeStamp};
89-
{error, Reason} -> {error, Reason};
94+
{commit_response, {ok, CommitTimeStamp}} -> {ok, CommitTimeStamp};
95+
{commit_response, {error, Reason}} -> {error, Reason};
96+
{error_response, Reason} -> {error, Reason};
9097
Other -> {error, Other}
9198
end
9299
end;
@@ -96,38 +103,40 @@ commit_transaction(Pid, {static, _TxId}) ->
96103
{ok, CommitTime}
97104
end.
98105

99-
-spec update_objects(Pid::term(), Updates::[{term(), term(), term()}], TxId::term()) -> ok | {error, term()}.
106+
-spec update_objects(Pid::pid(), Updates::[{term(), term(), term()}], {interactive | static, TxId::binary()}) -> ok | {error, term()}.
100107
update_objects(Pid, Updates, {interactive, TxId}) ->
101-
EncMsg = antidote_pb_codec: encode(update_objects, {Updates, TxId}),
108+
EncMsg = antidote_pb_codec: encode_request({update_objects, Updates, TxId}),
102109
Result = antidotec_pb_socket: call_infinity(Pid, {req, EncMsg, ?TIMEOUT}),
103110
case Result of
104111
{error, timeout} -> {error, timeout};
105112
_ ->
106113
case antidote_pb_codec: decode_response(Result) of
107-
{opresponse, ok} -> ok;
108-
{error, Reason} -> {error, Reason};
114+
{operation_response, ok} -> ok;
115+
{operation_response, {error, Reason}} -> {error, Reason};
116+
{error_response, Reason} -> {error, Reason};
109117
Other -> {error, Other}
110118
end
111119
end;
112120

113121
update_objects(Pid, Updates, {static, TxId}) ->
114122
{Clock, Properties} = TxId,
115-
EncMsg = antidote_pb_codec:encode(static_update_objects,
116-
{Clock, Properties, Updates}),
123+
EncMsg = antidote_pb_codec:encode_request({static_update_objects, Clock, Properties, Updates}),
117124
Result = antidotec_pb_socket:call_infinity(Pid, {req, EncMsg, ?TIMEOUT}),
118125
case Result of
119126
{error, timeout} -> {error, timeout};
120127
_ ->
121128
case antidote_pb_codec:decode_response(Result) of
122-
{commit_transaction, CommitTimeStamp} ->
129+
{commit_response, {ok, CommitTimeStamp}} ->
123130
antidotec_pb_socket:store_commit_time(Pid, CommitTimeStamp),
124131
ok;
125-
{error, Reason} -> {error, Reason};
132+
{commit_response, {error, Reason}} ->
133+
{error, Reason};
134+
{error_response, Reason} -> {error, Reason};
126135
Other -> {error, Other}
127136
end
128137
end.
129138

130-
-spec read_objects(Pid::term(), Objects::[term()], TxId::term()) -> {ok, [term()]} | {error, term()}.
139+
-spec read_objects(Pid::pid(), Objects::[term()], {interactive | static, TxId::binary()}) -> {ok, [term()]} | {error, term()}.
131140
read_objects(Pid, Objects, Transaction) ->
132141
case read_values(Pid, Objects, Transaction) of
133142
{ok, Values} ->
@@ -141,33 +150,34 @@ read_objects(Pid, Objects, Transaction) ->
141150
Other
142151
end.
143152

144-
-spec read_values(Pid::term(), Objects::[term()], TxId::term()) -> {ok, [term()]} | {error, term()}.
153+
-spec read_values(Pid::pid(), Objects::[term()], {interactive | static, TxId::binary()}) -> {ok, [term()]} | {error, term()}.
145154
read_values(Pid, Objects, {interactive, TxId}) ->
146-
EncMsg = antidote_pb_codec:encode(read_objects, {Objects, TxId}),
155+
EncMsg = antidote_pb_codec:encode_request({read_objects, Objects, TxId}),
147156
Result = antidotec_pb_socket:call_infinity(Pid, {req, EncMsg, ?TIMEOUT}),
148157
case Result of
149158
{error, timeout} -> {error, timeout};
150159
_ ->
151160
case antidote_pb_codec:decode_response(Result) of
152-
{read_objects, Values} ->
161+
{read_objects_response, {ok, Values}} ->
153162
{ok, Values};
154-
{error, Reason} -> {error, Reason};
163+
{read_objects_response, {error, Reason}} ->
164+
{error, Reason};
165+
{error_response, Reason} -> {error, Reason};
155166
Other -> {error, Other}
156167
end
157168
end;
158169
read_values(Pid, Objects, {static, TxId}) ->
159170
{Clock, Properties} = TxId,
160-
EncMsg = antidote_pb_codec:encode(static_read_objects,
161-
{Clock, Properties, Objects}),
171+
EncMsg = antidote_pb_codec:encode_request({static_read_objects, Clock, Properties, Objects}),
162172
Result = antidotec_pb_socket:call_infinity(Pid, {req, EncMsg, ?TIMEOUT}),
163173
case Result of
164174
{error, timeout} -> {error, timeout};
165175
_ ->
166176
case antidote_pb_codec:decode_response(Result) of
167-
{static_read_objects_resp, Values, CommitTimeStamp} ->
177+
{static_read_objects_response, {Values, CommitTimeStamp}} ->
168178
antidotec_pb_socket:store_commit_time(Pid, CommitTimeStamp),
169179
{ok, Values};
170-
{error, Reason} -> {error, Reason}
180+
{error_response, Reason} -> {error, Reason}
171181
end
172182
end.
173183

src/antidotec_pb_socket.erl

Lines changed: 13 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,8 @@
3030
-type address() :: string() | atom() | inet:ip_address().
3131
%% The TCP port number of the Riak node's Protocol Buffers interface
3232
-type portnum() :: non_neg_integer().
33-
-type msg_id() :: non_neg_integer().
34-
-type rpb_req() :: {tunneled, msg_id(), binary()} | atom() | tuple().
3533

36-
-record(request, {ref :: reference(), msg :: rpb_req(), from, timeout :: timeout(),
34+
-record(request, {ref :: reference(), from, timeout :: timeout(),
3735
tref :: reference() | undefined }).
3836

3937
-record(state, {
@@ -111,7 +109,17 @@ get_last_commit_time(Pid) ->
111109

112110
%% @private
113111
handle_call({req, Msg, Timeout}, From, State) ->
114-
{noreply, send_request(new_request(Msg, From, Timeout), State)};
112+
Ref = make_ref(),
113+
Req = #request{ref = Ref, from = From, timeout = Timeout,
114+
tref = create_req_timer(Timeout, Ref)},
115+
NewState = case gen_tcp:send(State#state.sock, Msg) of
116+
ok ->
117+
maybe_reply({noreply, State#state{active = Req}});
118+
{error, Reason} ->
119+
logger:warning("Socket error while sending request: ~p.", [Reason]),
120+
gen_tcp:close(State#state.sock)
121+
end,
122+
{noreply, NewState};
115123

116124
handle_call({store_commit_time, TimeStamp}, _From, State) ->
117125
{reply, ok, State#state{last_commit_time = TimeStamp}};
@@ -126,10 +134,8 @@ handle_call(stop, _From, State) ->
126134
%% @private
127135
%% @todo handle timeout
128136
handle_info({_Proto, Sock, Data}, State=#state{active = (Active = #request{})}) ->
129-
<<MsgCode:8, MsgData/binary>> = Data,
130-
Response = antidote_pb_codec:decode_msg(MsgCode, MsgData),
131137
cancel_req_timer(Active#request.tref),
132-
_ = send_caller(Response, Active),
138+
_ = send_caller(Data, Active),
133139
NewState = State#state{active = undefined},
134140
ok = inet:setopts(Sock, [{active, once}]),
135141
{noreply, NewState};
@@ -190,13 +196,6 @@ disconnect(State) ->
190196
NewState = State#state{sock = undefined, active = undefined},
191197
{stop, disconnected, NewState}.
192198

193-
194-
%% @private
195-
new_request(Msg, From, Timeout) ->
196-
Ref = make_ref(),
197-
#request{ref = Ref, msg = Msg, from = From, timeout = Timeout,
198-
tref = create_req_timer(Timeout, Ref)}.
199-
200199
%% @private
201200
%% Create a request timer if desired, otherwise return undefined.
202201
create_req_timer(infinity, _Ref) ->
@@ -206,22 +205,6 @@ create_req_timer(undefined, _Ref) ->
206205
create_req_timer(Msecs, Ref) ->
207206
erlang:send_after(Msecs, self(), {req_timeout, Ref}).
208207

209-
%% Send a request to the server and prepare the state for the response
210-
%% @private
211-
send_request(Request0, State) when State#state.active =:= undefined ->
212-
{Request, Pkt} = encode_request_message(Request0),
213-
case gen_tcp:send(State#state.sock, Pkt) of
214-
ok ->
215-
maybe_reply({noreply, State#state{active = Request}});
216-
{error, Reason} ->
217-
logger:warning("Socket error while sending request: ~p.", [Reason]),
218-
gen_tcp:close(State#state.sock)
219-
end.
220-
221-
%% Unencoded Request (the normal PB client path)
222-
encode_request_message(#request{msg=Msg}=Req) ->
223-
EncMsg = antidote_pb_codec:encode_msg(Msg),
224-
{Req, EncMsg}.
225208

226209
%% maybe_reply({reply, Reply, State = #state{active = Request}}) ->
227210
%% NewRequest = send_caller(Reply, Request),

0 commit comments

Comments
 (0)