26
26
import javax .jms .TextMessage ;
27
27
28
28
import org .apache .kafka .connect .data .Schema ;
29
+ import org .apache .kafka .connect .data .SchemaBuilder ;
30
+ import org .apache .kafka .connect .data .Struct ;
29
31
import org .apache .kafka .connect .sink .SinkRecord ;
30
32
import org .json .JSONObject ;
31
33
import org .junit .Before ;
@@ -66,8 +68,26 @@ public void buildTextMessageWithSchema() throws Exception {
66
68
}
67
69
68
70
@ Test
69
- public void buildJsonMessageWithoutSchema () throws Exception {
70
- Object testObject = generateComplexObject ();
71
+ public void buildStructMessage () throws Exception {
72
+ Struct testObject = generateComplexObjectAsStruct ();
73
+ Schema testSchema = testObject .schema ();
74
+
75
+ Message message = builder .fromSinkRecord (getJmsContext (), generateSinkRecord (testSchema , testObject ));
76
+ String contents = message .getBody (String .class );
77
+
78
+ JSONObject jsonContents = new JSONObject (contents );
79
+ assertEquals (3 , jsonContents .length ());
80
+ assertEquals ("this is a string" , jsonContents .getString ("mystring" ));
81
+ assertEquals (true , jsonContents .getJSONObject ("myobj" ).getBoolean ("mybool" ));
82
+ assertEquals (12345 , jsonContents .getJSONObject ("myobj" ).getInt ("myint" ));
83
+ assertEquals (12.4 , jsonContents .getJSONObject ("myobj" ).getDouble ("myfloat" ), 0.0001 );
84
+ assertEquals (4 , jsonContents .getJSONArray ("myarray" ).length ());
85
+ assertEquals ("first" , jsonContents .getJSONArray ("myarray" ).getString (0 ));
86
+ }
87
+
88
+ @ Test
89
+ public void buildMapMessage () throws Exception {
90
+ Object testObject = generateComplexObjectAsMap ();
71
91
72
92
Message message = builder .fromSinkRecord (getJmsContext (), generateSinkRecord (null , testObject ));
73
93
String contents = message .getBody (String .class );
@@ -83,7 +103,42 @@ public void buildJsonMessageWithoutSchema() throws Exception {
83
103
}
84
104
85
105
86
- private Object generateComplexObject () {
106
+ private Struct generateComplexObjectAsStruct () {
107
+ Schema innerSchema = SchemaBuilder .struct ()
108
+ .name ("com.ibm.eventstreams.tests.Inner" )
109
+ .field ("mybool" , Schema .BOOLEAN_SCHEMA )
110
+ .field ("myint" , Schema .INT32_SCHEMA )
111
+ .field ("myfloat" , Schema .FLOAT32_SCHEMA )
112
+ .field ("mybytes" , Schema .BYTES_SCHEMA )
113
+ .build ();
114
+
115
+ Schema complexSchema = SchemaBuilder .struct ()
116
+ .name ("com.ibm.eventstreams.tests.Complex" )
117
+ .field ("mystring" , Schema .STRING_SCHEMA )
118
+ .field ("myobj" , innerSchema )
119
+ .field ("myarray" , SchemaBuilder .array (Schema .STRING_SCHEMA ))
120
+ .build ();
121
+
122
+ List <String > innerary = new ArrayList <>();
123
+ innerary .add ("first" );
124
+ innerary .add ("second" );
125
+ innerary .add ("third" );
126
+ innerary .add ("fourth" );
127
+
128
+ Struct obj = new Struct (complexSchema )
129
+ .put ("mystring" , "this is a string" )
130
+ .put ("myobj" ,
131
+ new Struct (innerSchema )
132
+ .put ("mybool" , true )
133
+ .put ("myint" , 12345 )
134
+ .put ("myfloat" , 12.4f )
135
+ .put ("mybytes" , "Hello" .getBytes ()))
136
+ .put ("myarray" , innerary );
137
+
138
+ return obj ;
139
+ }
140
+
141
+ private Map <String , Object > generateComplexObjectAsMap () {
87
142
Map <String , Object > obj = new HashMap <>();
88
143
89
144
obj .put ("mystring" , "this is a string" );
0 commit comments