3
3
import com .google .common .annotations .VisibleForTesting ;
4
4
import com .kintone .client .api .record .CreateCursorResponseBody ;
5
5
import com .kintone .client .api .record .GetRecordsByCursorResponseBody ;
6
+ import com .kintone .client .model .app .field .FieldProperty ;
7
+ import com .kintone .client .model .record .FieldType ;
6
8
import com .kintone .client .model .record .Record ;
9
+ import com .kintone .client .model .record .TableRow ;
7
10
import org .embulk .config .ConfigDiff ;
8
11
import org .embulk .config .ConfigSource ;
9
12
import org .embulk .config .TaskReport ;
13
16
import org .embulk .spi .PageBuilder ;
14
17
import org .embulk .spi .PageOutput ;
15
18
import org .embulk .spi .Schema ;
19
+ import org .embulk .spi .Schema .Builder ;
20
+ import org .embulk .spi .type .Type ;
21
+ import org .embulk .spi .type .Types ;
16
22
import org .embulk .util .config .ConfigMapper ;
17
23
import org .embulk .util .config .ConfigMapperFactory ;
18
24
import org .embulk .util .config .TaskMapper ;
19
25
import org .embulk .util .config .modules .TimestampModule ;
20
26
import org .slf4j .Logger ;
21
27
import org .slf4j .LoggerFactory ;
22
28
29
+ import java .util .ArrayList ;
23
30
import java .util .List ;
31
+ import java .util .Map ;
32
+ import java .util .Set ;
33
+ import java .util .TreeMap ;
24
34
25
35
public class KintoneInputPlugin
26
36
implements InputPlugin
@@ -41,6 +51,10 @@ public ConfigDiff transaction(ConfigSource config,
41
51
Schema schema = task .getFields ().toSchema ();
42
52
int taskCount = 1 ; // number of run() method calls
43
53
54
+ if (schema .isEmpty ()) {
55
+ schema = buildSchema (task );
56
+ }
57
+
44
58
return resume (task .toTaskSource (), schema , taskCount , control );
45
59
}
46
60
@@ -74,14 +88,30 @@ public TaskReport run(TaskSource taskSource,
74
88
client .validateAuth (task );
75
89
client .connect (task );
76
90
77
- CreateCursorResponseBody cursor = client .createCursor (task );
91
+ CreateCursorResponseBody cursor = client .createCursor (task , schema );
78
92
GetRecordsByCursorResponseBody cursorResponse = new GetRecordsByCursorResponseBody (true , null );
79
93
94
+ List <String > subTableFieldCodes = null ;
95
+ if (task .getExpandSubtable ()) {
96
+ subTableFieldCodes = client .getFieldCodes (task , FieldType .SUBTABLE );
97
+ }
98
+
80
99
while (cursorResponse .isNext ()) {
81
100
cursorResponse = client .getRecordsByCursor (cursor .getId ());
82
101
for (Record record : cursorResponse .getRecords ()) {
83
- schema .visitColumns (new KintoneInputColumnVisitor (new KintoneAccessor (record ), pageBuilder , task ));
84
- pageBuilder .addRecord ();
102
+ List <Record > records ;
103
+ if (task .getExpandSubtable ()) {
104
+ records = expandSubtable (record , subTableFieldCodes );
105
+ }
106
+ else {
107
+ records = new ArrayList <>();
108
+ records .add (record );
109
+ }
110
+
111
+ for (Record expandedRecord : records ) {
112
+ schema .visitColumns (new KintoneInputColumnVisitor (new KintoneAccessor (expandedRecord ), pageBuilder , task ));
113
+ pageBuilder .addRecord ();
114
+ }
85
115
}
86
116
pageBuilder .flush ();
87
117
}
@@ -113,4 +143,95 @@ protected KintoneClient getKintoneClient()
113
143
{
114
144
return new KintoneClient ();
115
145
}
146
+
147
+ private List <Record > expandSubtable (final Record originalRecord , final List <String > subTableFieldCodes )
148
+ {
149
+ ArrayList <Record > records = new ArrayList <>();
150
+ records .add (cloneRecord (originalRecord ));
151
+ for (String fieldCode : subTableFieldCodes ) {
152
+ List <TableRow > tableRows = originalRecord .getSubtableFieldValue (fieldCode );
153
+ for (int idx = 0 ; idx < tableRows .size (); idx ++) {
154
+ if (records .size () < idx + 1 ) {
155
+ records .add (cloneRecord (originalRecord ));
156
+ }
157
+
158
+ TableRow tableRow = tableRows .get (idx );
159
+ Record currentRecord = records .get (idx );
160
+ Set <String > tableFieldCodes = tableRow .getFieldCodes ();
161
+ for (String tableFieldCode : tableFieldCodes ) {
162
+ currentRecord .putField (tableFieldCode , tableRow .getFieldValue (tableFieldCode ));
163
+ }
164
+ }
165
+ }
166
+ return records ;
167
+ }
168
+
169
+ private Record cloneRecord (final Record src )
170
+ {
171
+ Record dst = new Record (src .getId (), src .getRevision ());
172
+ for (String fieldCode : src .getFieldCodes (true )) {
173
+ dst .putField (fieldCode , src .getFieldValue (fieldCode ));
174
+ }
175
+ return dst ;
176
+ }
177
+
178
+ private Schema buildSchema (final PluginTask task )
179
+ {
180
+ KintoneClient client = getKintoneClient ();
181
+ client .validateAuth (task );
182
+ client .connect (task );
183
+
184
+ Map <String , FieldProperty > fields = new TreeMap <>(client .getFields (task ));
185
+ Builder builder = Schema .builder ();
186
+
187
+ // built in schema
188
+ builder .add ("$id" , Types .LONG );
189
+ builder .add ("$revision" , Types .LONG );
190
+
191
+ for (Map .Entry <String , FieldProperty > fieldEntry : fields .entrySet ()) {
192
+ builder .add (fieldEntry .getKey (), buildType (fieldEntry .getValue ().getType ()));
193
+ }
194
+
195
+ return builder .build ();
196
+ }
197
+
198
+ private Type buildType (final FieldType fieldType )
199
+ {
200
+ switch (fieldType ) {
201
+ case __ID__ :
202
+ case __REVISION__ :
203
+ case RECORD_NUMBER :
204
+ return Types .LONG ;
205
+ case CALC :
206
+ case NUMBER :
207
+ return Types .DOUBLE ;
208
+ case CREATED_TIME :
209
+ case DATETIME :
210
+ case UPDATED_TIME :
211
+ return Types .TIMESTAMP ;
212
+ case SUBTABLE :
213
+ return Types .JSON ;
214
+ case CATEGORY :
215
+ case CHECK_BOX :
216
+ case CREATOR :
217
+ case DATE :
218
+ case DROP_DOWN :
219
+ case FILE :
220
+ case GROUP_SELECT :
221
+ case LINK :
222
+ case MODIFIER :
223
+ case MULTI_LINE_TEXT :
224
+ case MULTI_SELECT :
225
+ case ORGANIZATION_SELECT :
226
+ case RADIO_BUTTON :
227
+ case RICH_TEXT :
228
+ case SINGLE_LINE_TEXT :
229
+ case STATUS :
230
+ case STATUS_ASSIGNEE :
231
+ case TIME :
232
+ case USER_SELECT :
233
+ default :
234
+ return Types .STRING ;
235
+ }
236
+ }
116
237
}
0 commit comments