@@ -63,25 +63,29 @@ public class RPCTransaction implements Transaction {
63
63
private final AtomicBoolean isOpen ;
64
64
65
65
RPCTransaction (RPCSession session , ByteString sessionId , Type type , GraknOptions options ) {
66
- this .type = type ;
67
- conceptManager = new ConceptManager (this );
68
- logicManager = new LogicManager (this );
69
- queryManager = new QueryManager (this );
70
- collectors = new ResponseCollectors ();
71
-
72
- // Opening the StreamObserver exposes these atomics to another thread, so we must initialize them first.
73
- isOpen = new AtomicBoolean (true );
74
- requestObserver = GraknGrpc .newStub (session .channel ()).transaction (responseObserver ());
75
- final TransactionProto .Transaction .Req .Builder openRequest = TransactionProto .Transaction .Req .newBuilder ()
76
- .putAllMetadata (tracingData ())
77
- .setOpenReq (TransactionProto .Transaction .Open .Req .newBuilder ()
78
- .setSessionId (sessionId )
79
- .setType (TransactionProto .Transaction .Type .forNumber (type .id ()))
80
- .setOptions (options (options )));
81
- final Instant startTime = Instant .now ();
82
- final TransactionProto .Transaction .Open .Res res = execute (openRequest ).getOpenRes ();
83
- final Instant endTime = Instant .now ();
84
- networkLatencyMillis = (int ) ChronoUnit .MILLIS .between (startTime , endTime ) - res .getProcessingTimeMillis ();
66
+ try {
67
+ this .type = type ;
68
+ conceptManager = new ConceptManager (this );
69
+ logicManager = new LogicManager (this );
70
+ queryManager = new QueryManager (this );
71
+ collectors = new ResponseCollectors ();
72
+
73
+ // Opening the StreamObserver exposes these atomics to another thread, so we must initialize them first.
74
+ isOpen = new AtomicBoolean (true );
75
+ requestObserver = GraknGrpc .newStub (session .channel ()).transaction (responseObserver ());
76
+ final TransactionProto .Transaction .Req .Builder openRequest = TransactionProto .Transaction .Req .newBuilder ()
77
+ .putAllMetadata (tracingData ())
78
+ .setOpenReq (TransactionProto .Transaction .Open .Req .newBuilder ()
79
+ .setSessionId (sessionId )
80
+ .setType (TransactionProto .Transaction .Type .forNumber (type .id ()))
81
+ .setOptions (options (options )));
82
+ final Instant startTime = Instant .now ();
83
+ final TransactionProto .Transaction .Open .Res res = execute (openRequest ).getOpenRes ();
84
+ final Instant endTime = Instant .now ();
85
+ networkLatencyMillis = (int ) ChronoUnit .MILLIS .between (startTime , endTime ) - res .getProcessingTimeMillis ();
86
+ } catch (StatusRuntimeException e ) {
87
+ throw new GraknClientException (e );
88
+ }
85
89
}
86
90
87
91
@ Override
@@ -137,7 +141,11 @@ public void close() {
137
141
private void close (Response .Done doneResponse ) {
138
142
if (isOpen .compareAndSet (true , false )) {
139
143
collectors .clear (doneResponse );
140
- requestObserver .onCompleted ();
144
+ try {
145
+ requestObserver .onCompleted ();
146
+ } catch (StatusRuntimeException e ) {
147
+ throw new GraknClientException (e );
148
+ }
141
149
}
142
150
}
143
151
0 commit comments