@@ -78,7 +78,7 @@ public class Concentrator extends Actuator implements IConcentrator {
78
78
private static final long DEFAULT_VALUE_OF_DEFAULT_LAN_EXECUTION_TIMEOUT = 1000 * 10 ;
79
79
private static final int DEFAULT_LAN_EXECUTION_TIMEOUT_CHECK_INTERVAL = 500 ;
80
80
81
- private static final int DEFAULT_NODE_ADDITION_TIMEOUT = 1000 * 60 * 5 ;
81
+ private static final long DEFAULT_ADD_NODE_TIMEOUT = 1000 * 60 * 5 ;
82
82
83
83
private static final int DEFAULT_ACK_REQUIRED_LAN_NOTIFICATIONS_POOL_SIZE = 512 ;
84
84
@@ -100,6 +100,7 @@ public class Concentrator extends Actuator implements IConcentrator {
100
100
protected Map <Integer , LanNode > nodes ;
101
101
protected Map <String , LanNode > confirmingNodes ;
102
102
protected Object nodesLock ;
103
+ protected long addNodeTimeout ;
103
104
104
105
protected Map <String , IThingModelDescriptor > lanThingModelDescriptors = new HashMap <>();
105
106
@@ -127,7 +128,7 @@ public class Concentrator extends Actuator implements IConcentrator {
127
128
protected IFollowProcessor followProcessor ;
128
129
129
130
protected QoS defaultDataQoS ;
130
- protected Map <Class <?>, QoS > datumTypeToQoSs ;
131
+ protected Map <Class <?>, QoS > dataTypeToQoSs ;
131
132
132
133
public Concentrator (IChatServices chatServices ) {
133
134
super (chatServices );
@@ -145,6 +146,8 @@ public Concentrator(IChatServices chatServices) {
145
146
confirmingNodes = new LinkedHashMap <>();
146
147
nodesLock = new Object ();
147
148
149
+ addNodeTimeout = DEFAULT_ADD_NODE_TIMEOUT ;
150
+
148
151
lanRouting = false ;
149
152
150
153
ackRequiredLanNotificationsPoolSize = DEFAULT_ACK_REQUIRED_LAN_NOTIFICATIONS_POOL_SIZE ;
@@ -160,7 +163,7 @@ public Concentrator(IChatServices chatServices) {
160
163
lanReportPreprocessors = new ArrayList <>();
161
164
162
165
defaultDataQoS = QoS .AT_MOST_ONCE ;
163
- datumTypeToQoSs = new HashMap <>();
166
+ dataTypeToQoSs = new HashMap <>();
164
167
165
168
INotificationService notificationService = chatServices .createApi (INotificationService .class );
166
169
notifier = notificationService .getNotifier ();
@@ -240,7 +243,7 @@ public void registerLanThingModel(IThingModelDescriptor modelDescriptor) {
240
243
}
241
244
242
245
for (Entry <Protocol , Class <?>> entry : modelDescriptor .getSupportedData ().entrySet ()) {
243
- registerLanDatum (entry .getValue ());
246
+ registerLanData (entry .getValue ());
244
247
}
245
248
246
249
if (logger .isInfoEnabled ())
@@ -267,10 +270,10 @@ protected void registerLanFollowedEvent(Protocol protocol, Class<?> eventType) {
267
270
ObxFactory .getInstance ().registerLanFollowedEvent (eventType );
268
271
}
269
272
270
- protected void registerLanDatum (Class <?> datumType ) {
271
- ObxFactory .getInstance ().registerLanDatum ( datumType );
273
+ protected void registerLanData (Class <?> dataType ) {
274
+ ObxFactory .getInstance ().registerLanData ( dataType );
272
275
273
- reporter .registerSupportedDatum ( datumType );
276
+ reporter .registerSupportedData ( dataType );
274
277
}
275
278
276
279
@ Override
@@ -502,7 +505,7 @@ protected void processLanReport(LanReport lanReport) {
502
505
ackRequiredLanReportsPool .put (traceId , null );
503
506
}
504
507
505
- QoS qos = getDatumQoS (lanReport .getData ().getClass ());
508
+ QoS qos = getDataQoS (lanReport .getData ().getClass ());
506
509
if (qos == null )
507
510
qos = defaultDataQoS ;
508
511
@@ -754,10 +757,10 @@ public void requestServerToAddNode(String thingId, String registrationCode, int
754
757
755
758
if (nodes .size () > (MAX_LAN_SIZE - 1 )) {
756
759
if (logger .isErrorEnabled ()) {
757
- logger .error ("Node size overflow." );
760
+ logger .error ("Nodes size overflow." );
758
761
}
759
762
760
- processNodeAdditionError (AddNodeError .SIZE_OVERFLOW , node );
763
+ processAddNodeError (AddNodeError .SIZE_OVERFLOW , node );
761
764
762
765
return ;
763
766
}
@@ -768,7 +771,7 @@ public void requestServerToAddNode(String thingId, String registrationCode, int
768
771
logger .error ("Reduplicate thing ID: {}." , node .getThingId ());
769
772
}
770
773
771
- processNodeAdditionError (AddNodeError .REDUPLICATE_THING_ID , node );
774
+ processAddNodeError (AddNodeError .REDUPLICATE_THING_ID , node );
772
775
return ;
773
776
}
774
777
@@ -777,7 +780,7 @@ public void requestServerToAddNode(String thingId, String registrationCode, int
777
780
logger .error ("Reduplicate thing address: {}." , node .getAddress ());
778
781
}
779
782
780
- processNodeAdditionError (AddNodeError .REDUPLICATE_THING_ADDRESS , node );
783
+ processAddNodeError (AddNodeError .REDUPLICATE_THING_ADDRESS , node );
781
784
return ;
782
785
}
783
786
@@ -786,24 +789,24 @@ public void requestServerToAddNode(String thingId, String registrationCode, int
786
789
logger .error ("Reduplicate thing LAN ID: {}." , node .getLanId ());
787
790
}
788
791
789
- processNodeAdditionError (AddNodeError .REDUPLICATE_LAN_ID , node );
792
+ processAddNodeError (AddNodeError .REDUPLICATE_LAN_ID , node );
790
793
return ;
791
794
}
792
795
}
793
796
794
- chatServices .getTaskService ().execute (new NodeAdditionTask (node ));
797
+ chatServices .getTaskService ().execute (new AddNodeTask (node ));
795
798
796
799
if (logger .isInfoEnabled ()) {
797
- logger .info ("Node addition request for node which's thingID is '{}' and address is '{}' has sent." ,
800
+ logger .info ("Add node request for node which's thingID is '{}' and address is '{}' has sent." ,
798
801
thingId , address );
799
802
}
800
803
}
801
804
}
802
805
803
- private class NodeAdditionTask implements ITask <Iq > {
806
+ private class AddNodeTask implements ITask <Iq > {
804
807
private LanNode node ;
805
808
806
- public NodeAdditionTask (LanNode node ) {
809
+ public AddNodeTask (LanNode node ) {
807
810
this .node = node ;
808
811
}
809
812
@@ -817,13 +820,13 @@ public void trigger(IUnidirectionalStream<Iq> stream) {
817
820
addNode .setAddress (node .getAddress ());
818
821
819
822
Iq iq = new Iq (Iq .Type .SET , addNode );
820
- stream .send (iq , DEFAULT_NODE_ADDITION_TIMEOUT );
823
+ stream .send (iq , addNodeTimeout );
821
824
822
825
synchronized (nodesLock ) {
823
826
confirmingNodes .put (iq .getId (), node );
824
827
}
825
828
}
826
-
829
+
827
830
@ Override
828
831
public void processResponse (IUnidirectionalStream <Iq > stream , Iq iq ) {
829
832
NodeAdded nodeAdded = iq .getObject ();
@@ -834,7 +837,7 @@ public void processResponse(IUnidirectionalStream<Iq> stream, Iq iq) {
834
837
logger .error ("Confirming node which's thing ID is '{}' not found." , nodeAdded .getNodeThingId ());
835
838
}
836
839
837
- processNodeAdditionError (IConcentrator .AddNodeError .ADDED_NODE_NOT_FOUND , node );
840
+ processAddNodeError (IConcentrator .AddNodeError .ADDED_NODE_NOT_FOUND , node );
838
841
return ;
839
842
}
840
843
@@ -847,7 +850,7 @@ public void processResponse(IUnidirectionalStream<Iq> stream, Iq iq) {
847
850
getThingName (), confirmingNode .getThingId (), nodeAdded .getNodeThingId (), nodeAdded .getConcentratorThingName ());
848
851
}
849
852
850
- processNodeAdditionError (IConcentrator .AddNodeError .BAD_NODE_ADDITION_RESPONSE , node );
853
+ processAddNodeError (IConcentrator .AddNodeError .BAD_NODE_ADDITION_RESPONSE , node );
851
854
return ;
852
855
}
853
856
@@ -856,7 +859,7 @@ public void processResponse(IUnidirectionalStream<Iq> stream, Iq iq) {
856
859
logger .error ("Bad node addition response. The server changed LAN ID of node to {}." , nodeAdded .getLanId ());
857
860
}
858
861
859
- processNodeAdditionError (IConcentrator .AddNodeError .SERVER_CHANGED_LAN_ID , node );
862
+ processAddNodeError (IConcentrator .AddNodeError .SERVER_CHANGED_LAN_ID , node );
860
863
return ;
861
864
}
862
865
@@ -888,15 +891,15 @@ public boolean processError(IUnidirectionalStream<Iq> stream, StanzaError error)
888
891
}
889
892
890
893
if (ItemNotFound .DEFINED_CONDITION .equals (error .getDefinedCondition ())) {
891
- processNodeAdditionError (IConcentrator .AddNodeError .NO_SUCH_CONCENTRATOR , node );
894
+ processAddNodeError (IConcentrator .AddNodeError .NO_SUCH_CONCENTRATOR , node );
892
895
} else if (ServiceUnavailable .DEFINED_CONDITION .equals (error .getDefinedCondition ())) {
893
- processNodeAdditionError (IConcentrator .AddNodeError .NOT_CONCENTRATOR , node );
896
+ processAddNodeError (IConcentrator .AddNodeError .NOT_CONCENTRATOR , node );
894
897
} else if (Conflict .DEFINED_CONDITION .equals (error .getDefinedCondition ())) {
895
- processNodeAdditionError (IConcentrator .AddNodeError .REDUPLICATE_NODE_OR_LAN_ID , node );
898
+ processAddNodeError (IConcentrator .AddNodeError .REDUPLICATE_NODE_OR_LAN_ID , node );
896
899
} else if (NotAcceptable .DEFINED_CONDITION .equals (error .getDefinedCondition ())) {
897
- processNodeAdditionError (IConcentrator .AddNodeError .NOT_UNREGISTERED_THING , node );
900
+ processAddNodeError (IConcentrator .AddNodeError .NOT_UNREGISTERED_THING , node );
898
901
} else {
899
- processNodeAdditionError (IConcentrator .AddNodeError .UNKNOWN_ERROR , node );
902
+ processAddNodeError (IConcentrator .AddNodeError .UNKNOWN_ERROR , node );
900
903
}
901
904
} finally {
902
905
synchronized (nodesLock ) {
@@ -915,7 +918,7 @@ public boolean processTimeout(IUnidirectionalStream<Iq> stream, Iq iq) {
915
918
916
919
nodes .remove (node .getLanId ());
917
920
918
- processNodeAdditionError (IConcentrator .AddNodeError .REMOTE_SERVER_TIMEOUT , node );
921
+ processAddNodeError (IConcentrator .AddNodeError .REMOTE_SERVER_TIMEOUT , node );
919
922
920
923
return true ;
921
924
}
@@ -934,7 +937,7 @@ public void removeNode(int lanId) {
934
937
}
935
938
}
936
939
937
- private void processNodeAdditionError (IConcentrator .AddNodeError error , LanNode node ) {
940
+ private void processAddNodeError (IConcentrator .AddNodeError error , LanNode node ) {
938
941
for (IConcentrator .Listener listener : listeners ) {
939
942
listener .occurred (error , node );
940
943
}
@@ -1108,20 +1111,20 @@ public String getThingName() {
1108
1111
}
1109
1112
1110
1113
@ Override
1111
- public boolean isLanDatumSupported (String model , Protocol protocol ) {
1114
+ public boolean isLanDataSupported (String model , Protocol protocol ) {
1112
1115
if (!lanThingModelDescriptors .containsKey (model ))
1113
1116
throw new IllegalArgumentException (String .format ("Unsupported model: '%s'" , model ));
1114
1117
1115
1118
return lanThingModelDescriptors .get (model ).getSupportedData ().containsKey (protocol );
1116
1119
}
1117
1120
1118
1121
@ Override
1119
- public Class <?> getLanDatumType (String model , Protocol protocol ) {
1122
+ public Class <?> getLanDataType (String model , Protocol protocol ) {
1120
1123
if (!lanThingModelDescriptors .containsKey (model ))
1121
1124
throw new IllegalArgumentException (String .format ("Unsupported model: '%s'" , model ));
1122
1125
1123
1126
if (!lanThingModelDescriptors .get (model ).getSupportedData ().containsKey (protocol )) {
1124
- throw new IllegalArgumentException (String .format ("Unsupported datum which's protocol is '%s' for thing which's model is '%s'." , protocol , model ));
1127
+ throw new IllegalArgumentException (String .format ("Unsupported data which's protocol is '%s' for thing which's model is '%s'." , protocol , model ));
1125
1128
}
1126
1129
1127
1130
return lanThingModelDescriptors .get (model ).getSupportedData ().get (protocol );
@@ -1176,11 +1179,11 @@ public Class<?> getLanEventType(String model, Protocol protocol) {
1176
1179
}
1177
1180
1178
1181
@ Override
1179
- public boolean isLanDatumSupported (String model , Class <?> datumType ) {
1182
+ public boolean isLanDataSupported (String model , Class <?> dataType ) {
1180
1183
if (!lanThingModelDescriptors .containsKey (model ))
1181
1184
throw new IllegalArgumentException (String .format ("Unsupported model: '%s'" , model ));
1182
1185
1183
- return lanThingModelDescriptors .get (model ).getSupportedData ().containsValue (datumType );
1186
+ return lanThingModelDescriptors .get (model ).getSupportedData ().containsValue (dataType );
1184
1187
}
1185
1188
1186
1189
@ Override
@@ -1564,12 +1567,22 @@ public QoS getDefaultDataQoS() {
1564
1567
}
1565
1568
1566
1569
@ Override
1567
- public void setDatumQoS (Class <?> datumType , QoS qos ) {
1568
- datumTypeToQoSs .put (datumType , qos );
1570
+ public void setDataQoS (Class <?> dataType , QoS qos ) {
1571
+ dataTypeToQoSs .put (dataType , qos );
1572
+ }
1573
+
1574
+ @ Override
1575
+ public QoS getDataQoS (Class <?> dataType ) {
1576
+ return dataTypeToQoSs .get (dataType );
1577
+ }
1578
+
1579
+ @ Override
1580
+ public void setAddNodeTimeout (long addNodeTimeout ) {
1581
+ this .addNodeTimeout = addNodeTimeout ;
1569
1582
}
1570
1583
1571
1584
@ Override
1572
- public QoS getDatumQoS ( Class <?> datumType ) {
1573
- return datumTypeToQoSs . get ( datumType ) ;
1585
+ public long getAddNodeTimeout ( ) {
1586
+ return addNodeTimeout ;
1574
1587
}
1575
1588
}
0 commit comments