@@ -41,14 +41,30 @@ class Bulk
41
41
*/
42
42
public $ body = [];
43
43
44
+ /**
45
+ * Number of pending operations
46
+ * @var int
47
+ */
48
+ public $ operationCount = 0 ;
49
+
50
+ /**
51
+ * Operation count which will trigger autocommit
52
+ * @var int
53
+ */
54
+ public $ autocommitAfter = 0 ;
55
+
44
56
45
57
/**
46
58
* Bulk constructor.
47
59
* @param Query $query
60
+ * @param int $autocommitAfter
48
61
*/
49
- public function __construct (Query $ query )
62
+ public function __construct (Query $ query, $ autocommitAfter = 0 )
50
63
{
64
+
51
65
$ this ->query = $ query ;
66
+ $ this ->autocommitAfter = intval ($ autocommitAfter );
67
+
52
68
}
53
69
54
70
/**
@@ -125,26 +141,70 @@ public function id($_id = false)
125
141
}
126
142
127
143
/**
128
- * Add pending document
144
+ * Add pending document for insert
129
145
* @param array $data
146
+ * @return mixed
130
147
*/
131
148
public function insert ($ data = [])
132
149
{
133
150
151
+ return $ this ->action ('index ' , $ data );
152
+
153
+ }
154
+
155
+ /**
156
+ * Add pending document for update
157
+ * @param array $data
158
+ * @return mixed
159
+ */
160
+ public function update ($ data = [])
161
+ {
162
+
163
+ return $ this ->action ('update ' , $ data );
164
+
165
+ }
166
+
167
+ /**
168
+ * Add pending document for deletion
169
+ */
170
+ public function delete ()
171
+ {
172
+
173
+ return $ this ->action ('delete ' );
174
+
175
+ }
176
+
177
+ /**
178
+ * Add pending document abstract action
179
+ * @param string $actionType
180
+ * @param array $data
181
+ * @return mixed
182
+ */
183
+ public function action ($ actionType , $ data = [])
184
+ {
134
185
$ this ->body ["body " ][] = [
135
186
136
- ' index ' => [
187
+ $ actionType => [
137
188
'_index ' => $ this ->getIndex (),
138
189
'_type ' => $ this ->getType (),
139
190
'_id ' => $ this ->_id
140
191
]
141
192
142
193
];
143
194
144
- $ this ->body ["body " ][] = $ data ;
195
+ if (!empty ($ data )) {
196
+ $ this ->body ["body " ][] = $ data ;
197
+ }
198
+
199
+ $ this ->operationCount ++;
145
200
146
201
$ this ->reset ();
147
202
203
+ if ($ this ->autocommitAfter > 0 && $ this ->operationCount >= $ this ->autocommitAfter ) {
204
+ return $ this ->commit ();
205
+ }
206
+
207
+ return true ;
148
208
}
149
209
150
210
/**
@@ -170,5 +230,20 @@ public function reset()
170
230
171
231
}
172
232
233
+ /**
234
+ * Commit all pending operations
235
+ */
236
+ public function commit ()
237
+ {
173
238
239
+ if (empty ($ this ->body )) {
240
+ return false ;
241
+ }
242
+
243
+ $ result = $ this ->query ->connection ->bulk ($ this ->body );
244
+ $ this ->operationCount = 0 ;
245
+ $ this ->body = [];
246
+
247
+ return $ result ;
248
+ }
174
249
}
0 commit comments